execute
线程池是通过execute方法,添加任务(task)的。方法保证在将来的某一时间,执行给定的任务。任务可能在新的线程中被执行,也可能在已经存在于池子中的线程中被执行。
如果任务因为线程池关闭,或者容量满,而不能被提交执行,任务将被当前设置的RejectedExecutionHandler处理。
后续的执行步骤是三种情况
- 如果运行的线程数小于corePoolSize,则尝试新建一个线程,并设置当前的任务为该线程第一个需要处理的任务。
- 如果运行的线程数大于等于corePoolSize,则将该任务添加到阻塞队列workQueue中,池子中的空闲线程就会去队列汇总取任务之心。
- 当队列满了后,添加队列会失败,这是尝试直接创建一个线程,如果失败,可以得知,要么线程池已经关闭,要么是达到了饱和状态,不能在接受新任务,则该任务将被拒绝。
execute代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 上面的step1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 上面的step2,workQueue.offer可以把command添加到阻塞队列中,这个地方有双检测
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// double-checked,如果线程池关闭,还要remove
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是running状态 c & CAPACITY = e000 0000 & 1000 0000 = 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
该方法用于创建一个新线程,即上面第一种情况。addWorker(firstTask,true)
addWorker代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// api说明中的解释是,如果线程池关闭或适合关闭,则方法返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果worker数量比coresize大,或者比maxsize大,就不需要添加了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 对clt做cas自增,成功后跳出外层循环retry;失败的话,说明有其他线程修改了clt,继续后续代码逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl 重新获取ctl
if (runStateOf(c) != rs) // 不同,说明ctl变化,continue外层retry循环
continue retry;
// 否则,说明cas更新失败,应该重新内存循环,尝试再一次cas自增
}
}
// works是个存有work对象的hashset,为了保证线程安全,需要通过mainlock加锁,保证同步访问,当调用方,传入形参addWorker(null,false)的时候,是不会触发后面的加锁同步逻辑的
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
任务通过execute方法加入到线程池后,调用链是:
execute –> addWorker –> t.start –> 内部类worker.run() –> runWorker –> getTask() –> workQueue.take 到这里就可以完成从队列中取任务执行,并且可以阻塞。具体何时getTask由上层方法调用决定,即step1,2,3。