为什么需要线程池?
避免在运行大量任务时,频繁的线程创建和销毁开销;
使资源的使用得到有效控制,避免创建过多的线程占用系统资源。
基本概念 Core and maximum pool sizes 控制线程池核心线程数以及最大可生成的线程数量。是否需要创建线程与当前线程的数量以及任务队列的状态在关,后面会详述。
Keep-alive times
默认情况下,只有在当前worker线程数大于core大小的情况下,空闲一定时间的worker线程才可以被回收,但是也可以通过allowCoreThreadTimeOut(boolean)函数来控制core线程的超时时间。
任务队列 ThreadPoolExecutor使用BlockingQueue来管理任务队列,任务队列与线程池大小的关系如下:
如果线程池数量小于corePoolSize,Executor倾向于新增worker线程;
如果线程池数量多于或者等于corePoolSize倾向于将任务放入队列;
如果任务队列已满,并且线程池数量还没有超过maximumPoolSize,那么新的worker线程;
如果任务队列已满,并且线程池数量已经超过maximumPoolSize,那么任务被reject;
实现 提交任务 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
创建worker线程 去除一些状态检查后,核心代码如下:
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 private boolean addWorker (Runnable firstTask, boolean core) { Worker w = new Worker (firstTask); Thread t = w.thread; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); } t.start(); return true ; }
可以看到,很简单,创建一个Worker线程,将他加到workers集合中,然后启动对应worker线程,DONE。
我们来看看Worker的定义:
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() == 1 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } }
除去跟锁定义相关的代码后,核心就是run函数的实现:调用runWorker运行Worker线程的运行逻辑。
Worker线程运行逻辑 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 final void runWorker (Worker w) { Runnable task = w.firstTask; w.firstTask = null ; boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
就是一个while循环,在有任务的情况下(两种:一种在创建Worker线程时传入,由firtstTask传入;一种通过getTask由任务队列获取),执行任务,并调用设置的回调函数(beforeExecute,afterExecute等)。
我们来看看getTask的实现:
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 private Runnable getTask () { boolean timedOut = false ; for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
去除了状态检查的相关代码后,核心的逻辑如下:在需要处理超时 的情况下调用BlockingQueue.poll来获取任务,如果在超时后还没有任务,则让相应的worker线程退出;如果不需要处理超时时候,调用BlockingQueue.take,阻塞当前worker线程一直到有任务到达。
总结 ThreadPoolExecutor会根据线程池状态和任务队列状态创建worker线程,而每个worker线程的主要任务就是不断的去任务队列里去拿任务:要么一直阻塞等,要么超时后退出;拿到任务后,运行任务并调用相关回调。