ThreadPoolExecutor

execute

线程池是通过execute方法,添加任务(task)的。方法保证在将来的某一时间,执行给定的任务。任务可能在新的线程中被执行,也可能在已经存在于池子中的线程中被执行。

如果任务因为线程池关闭,或者容量满,而不能被提交执行,任务将被当前设置的RejectedExecutionHandler处理。

后续的执行步骤是三种情况

  1. 如果运行的线程数小于corePoolSize,则尝试新建一个线程,并设置当前的任务为该线程第一个需要处理的任务。
  2. 如果运行的线程数大于等于corePoolSize,则将该任务添加到阻塞队列workQueue中,池子中的空闲线程就会去队列汇总取任务之心。
  3. 当队列满了后,添加队列会失败,这是尝试直接创建一个线程,如果失败,可以得知,要么线程池已经关闭,要么是达到了饱和状态,不能在接受新任务,则该任务将被拒绝。

execute代码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 上面的step1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 上面的step2,workQueue.offer可以把command添加到阻塞队列中,这个地方有双检测
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double-checked,如果线程池关闭,还要remove
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果是running状态 c & CAPACITY = e000 0000 & 1000 0000 = 0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

addWorker

该方法用于创建一个新线程,即上面第一种情况。addWorker(firstTask,true)

addWorker代码

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // api说明中的解释是,如果线程池关闭或适合关闭,则方法返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 如果worker数量比coresize大,或者比maxsize大,就不需要添加了
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 对clt做cas自增,成功后跳出外层循环retry;失败的话,说明有其他线程修改了clt,继续后续代码逻辑    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl 重新获取ctl
            if (runStateOf(c) != rs) // 不同,说明ctl变化,continue外层retry循环
                continue retry;
            // 否则,说明cas更新失败,应该重新内存循环,尝试再一次cas自增
        }
    }

    // works是个存有work对象的hashset,为了保证线程安全,需要通过mainlock加锁,保证同步访问,当调用方,传入形参addWorker(null,false)的时候,是不会触发后面的加锁同步逻辑的
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

任务通过execute方法加入到线程池后,调用链是:

execute –> addWorker –> t.start –> 内部类worker.run() –> runWorker –> getTask() –> workQueue.take 到这里就可以完成从队列中取任务执行,并且可以阻塞。具体何时getTask由上层方法调用决定,即step1,2,3。