本文 首发于 🍀 永浩转载 请注明 来源

线程池

ThreadPool源码学习 zodiac ·2020-12-10 ·20 次阅读

ThreadPoolExecutor jdk1.5在juc包里提供了方便快捷的线程池api,并提供了基于工厂模式的Executors工具类用于快捷创建线程池,在实际开发过程中,需要使用线程池时,应当优先考虑使用Executors

1、类继承关系

Executor为顶级接口,其主要目标是将任务、任务的提交与任务的执行解耦

ExecutorService接口则定义了正常的线程池应该有的功能与行为,诸如任务提交,异步执行等等

ScheduledExecutorService接口则定义了一些定时的特性

2、Executor Executor执行提交的任务(Runnable),该接口提供了一种将任务提交与任务执行(包括执行细节:线程、定时等)解耦的途径。

通过Executor包装的线程(Thread)对象,避免直接使用Thread对象来执行任务,可以有效将线程信息屏蔽,避免直接对线程的操作。

Executor本身并不强制要求执行的任务必须是异步执行

用于执行任务的执行器,而Runnable则表示可以用于执行的任务。

/**

  • 在未来某个时刻执行给定的指令,命令可以在新的线程中、线程池或调用线程中执行
  • 实际情况取决于Executor的实现 */ void execute(Runnable command); 3、ExecutorService 能够管理任务终止、能够产生追踪一个或多个异步任务处理进度的Future的执行器(Executor)

接口的核心定义:

提交任务

Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); 第一种提交Callable task类型的任务较好理解,任务完成时会将task的结果放至Future中。

第二种方式,Runnable task和 T result作为参数,实际上内部通过包装将入参result作为返回值与Runnable task一同包装为一个Callable,最后任务完成时将Callable结果放至Future中。因此通过这种方式,入参result,返回结果则是可能在任务中被修改的result。

AbstractExecutorService

public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null);//生成一个RunnableFuture作为任务 execute(ftask); return ftask; }

//异步任务的抽象,内部封装了实际的任务、任务状态、真正的执行线程以及等待任务完成的线程等细节 //实际上是调用FutureTask.run()的线程被阻塞,作为真正的执行线程 protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value);//创建FutureTask } FutureTask

public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result);//将Runnable适配为Callable this.state = NEW; // ensure visibility of callable } Tips:

FutureTask里有很多特性(比如对等待任务完成的线程进行阻塞,任务完成后对等待线程的唤醒,防止任务被并发调用等等)都可以使用AbstractQueuedSynchronizer,但在JDK1.8中的源码却没有发现AQS的痕迹,想想这是为何?

早期FutureTask确实是使用AQS实现,后续修改为了目前的样子(很多通过内存直接修改对象的操作,Unsafe类),核心是为了性能。这一部分可以再单独深入看看

Executors.RunnableAdapter

//简单的适配,将Runnable包装为Callable static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } 第三种方式,与第二种类似,会创一个result 为 null的Runnable适配器。

异步执行任务

通过任务提交、invokeAny、invokeAll

等待任一/全部任务执行完成

invokeAny() 一次性提交批量任务,有任一任务完成时返回该任务的处理结果,调用线程阻塞。

private T doInvokeAny(Collection<? extends Callable> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future> futures = new ArrayList<Future>(ntasks); ExecutorCompletionService ecs = new ExecutorCompletionService(this);//Wrapper或Decorator模式(组合模式??),增强了对已完成任务的管理能力

try {
    ExecutionException ee = null;
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Iterator<? extends Callable<T>> it = tasks.iterator();
    //1、提交第一个任务
    futures.add(ecs.submit(it.next()));
    --ntasks;
    int active = 1;
    for (; ; ) {
        //2、判断任务是否完成
        Future<T> f = ecs.poll();
        if (f == null) {
            //3、没有完成,且还有任务可以提交时,继续提交
            if (ntasks > 0) {
                --ntasks;
                futures.add(ecs.submit(it.next()));
                ++active;
            } else if (active == 0)//4、没有完成、没有任务可以提交、无处理中任务跳出
                break;
            else if (timed) {//5、无任务提交,任务在处理中时,设置等待任务完成
                f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                if (f == null)
                    throw new TimeoutException();
                nanos = deadline - System.nanoTime();
            } else//6、无限期阻塞,等待完成任务的队列有完成任务可以获得
                f = ecs.take();
        }
        if (f != null) {//7、获取到完成任务
            --active;
            try {
                return f.get();//8、返回完成任务的结果
            } catch (ExecutionException eex) {
                ee = eex;
            } catch (RuntimeException rex) {
                ee = new ExecutionException(rex);
            }
        }
    }
 
    if (ee == null)
        ee = new ExecutionException();
    throw ee;
} finally {//9、最终取消所有任务
    for (int i = 0, size = futures.size(); i < size; i++)
        futures.get(i).cancel(true);//FutureTask.cancel()只有当未NEW状态才会取消
}

} invokeAll 如果无超时时间(较为简单)

则遍历FutureTask,通过FutureTask.get()方法阻塞调用线程即可。

如果存在超时时间

遍历FutureTask,每提交一次任务检查一次是否超时。任务提交完成后,遍历未结束Future,调用Future.get(timeout),最终返回结果,任务清理。

public List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); ArrayList<Future> futures = new ArrayList<Future>(tasks.size()); boolean done = false; try { for (Callable t : tasks) futures.add(newTaskFor(t));

    final long deadline = System.nanoTime() + nanos;
    final int size = futures.size();
 
    // Interleave time checks and calls to execute in case
    // executor doesn't have any/much parallelism.
    for (int i = 0; i < size; i++) {
        execute((Runnable)futures.get(i));//执行任务,executor不保证异步执行
        nanos = deadline - System.nanoTime();//检测超时时间
        if (nanos <= 0L)
            return futures;//超时间内未提交的任务,不会再被执行
    }
 
    for (int i = 0; i < size; i++) {
        Future<T> f = futures.get(i);
        if (!f.isDone()) {//未完成future
            if (nanos <= 0L)//阻塞前先判断一次是否超时
                return futures;
            try {
                f.get(nanos, TimeUnit.NANOSECONDS);//超时时间内获取
            } catch (CancellationException ignore) {
            } catch (ExecutionException ignore) {
            } catch (TimeoutException toe) {
                return futures;
            }
            nanos = deadline - System.nanoTime();//更新超时时间
        }
    }
    done = true;
    return futures;
} finally {
    if (!done)
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);//最终未完成的任务,不会再被执行
}

} 终止任务提交

shutdown()

已提交的任务,仍将被执行,但新的任务不再被接收。如果已经被shutdown,再次调用无影响

终止任务执行

List shutdownNow();

立即尝试停止所有正在执行的任务,返回等待执行的任务,不会等待正在执行的任务终结。

该方法会尝试尽最大努力终结执行中的任务,但无法保证正在执行的任务被终结,因此,如果有任务终结失败,该任务也许永远无法被终止。

等待执行器进入终止状态

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

当调用了showdown()后,在超时间内、调用线程被中断前阻塞调用线程等待已提交任务完成。

4、AbstractExecutorService 通过模板模式的设计模式,对ExecutorService接口中定义的某些方法,进行了通用实现。

public Future submit(Runnable task){…} public Future submit(Runnable task, T result) {…} public Future submit(Callable task) {…}

public T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException {…} public List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException {…} submit(…):

主要逻辑:

1、包装Callable、Runnable、result为RunnableFuture(实际默认实现是FutureTask,适配Runable接口)

2、调用实现类execute()方法执行RunnableFuture

invokeAny(…)

invokeAll(…)

上述方法的具体源码可以参见ExecutorService部分

5、ThreadPoolExecutor 虽然AbstractExecutorService进行了一些通用实现,但诸如execute()、shutdown()等依赖实际执行路径与执行器内部状态的方法并未被实现,因此这些方法的实现逻辑将是对Executor进行区分的重要因素。

ThreadPoolExecutor直译为线程池执行器,该类通过维护内线程池,最大程度复用线程,减少线程创建、销毁与维护的开销,提高任务的执行效率。通常会作为系统的一个异步处理模块出现,最大程度降低系统对硬件资源的占用。同时也提供了友善的api,通过对线程池核心参数的设计,可以设计出不同类型的线程池,扩展线程池的可应用场景。

基础概念 包含线程控制状态、作业队列、工人(worker)、工人集合(worker set)、工人数量(worker count)、终止条件、线程工厂、任务拒绝处理器、核心池大小、最大池大小等等

控制状态、工人数量

控制状态:

标记了执行器的生命周期,记录线程池的当前状态,分为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED,主要是为了实现shutdown()等涉及到执行器状态的方法,如果不存在控制状态,则无法实现类似:拒绝新任务的添加、终结执行器等功能。

阶段 说明 状态转换 RUNING 任务执行中,此时可以接收新的任务 SHUTDOWN:调用shutdown() STOP:调用shutdownNow() SHUTDOWN 任务执行中,但不再接收新的任务 STOP:调用shutdownNow() TIDYING:任务完成、线程池清空 STOP 执行中的任务将被终止,且不再接收新的任务,不再继续处理任务 TIDYING:线程池清空 TIDYING 整理阶段,主要是回收资源与收尾工作。所有的任务已经结束,工人数量为0(线程已经完成回收),在该阶段对应线程将调用钩子函数terminated()告知子类即将要终结 TERMINATED:terminated()执行后 TERMINATED terminated()完成 ThreadPoolExecutor实现中将线程池状态与工人数量整合到一个Integer内,为了保证并发安全,Integer使用AtomicInteger。控制状态一共5种,在设计中通过保证各个状态在状态空间中的有序,直接使用数值的方式判断当前状态。

工人数量(workerCount):

记录当前线程池中的有效线程数量,主要用于状态变更、判断是否需要新增线程、线程数量是否已达设定值等。

状态字段:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//29 private static final int CAPACITY = (1 « COUNT_BITS) - 1;//0001111111111…

// runState is stored in the high-order bits private static final int RUNNING = -1 « COUNT_BITS;//RUNING状态下,ctl<0 private static final int SHUTDOWN = 0 « COUNT_BITS;//SHUTDOWN状态下,当workercount=0时,ctl=0 private static final int STOP = 1 « COUNT_BITS; private static final int TIDYING = 2 « COUNT_BITS; private static final int TERMINATED = 3 « COUNT_BITS;

// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }//取高三位的结果 private static int workerCountOf(int c) { return c & CAPACITY; }//低位即wokerCount private static int ctlOf(int rs, int wc) { return rs | wc; } 作业队列(workQueue)-BlockingQueue

用于保存待处理任务并将任务移交给工作线程。声明类型为阻塞队列(BlockingQueue)

不要求poll()返回null时代表队列为空(isEmpty),在决定是否进行线程池状态转移时(比如由SHUTDOWN转移为TIDYING需要判断任务队列是否为空)使用isEmpty判断队列是否为空。这种设计就可以让workQueue使用一些特殊设计的队列,比如延迟队列(DelayQueue,即便poll()为null,但延迟一段时间后可以返回non-null,即无法通过poll()是否为null判断队列是否为空),ThreadPoolExecutor通过isEmpty而非poll() == null 的方式判断队列是否为空,能够对工作队列的实现类型更加包容。

ThreadPoolExecutor并没有为BlokingQueue提供默认实现(声明时没有指定实例,且所有的构造方法没有为其提供默认实现),但使用Executors创建ThreadPoolExecutors时默认会使用LinkedBlockedQueue作为默认实现。

关于阻塞队列(BlockingQueue)

合适的阻塞队列,当队列空时会阻塞消费者,队列满时阻塞生产者。

如果不适用阻塞队列,可以使用线程安全队列+标志锁实现~(但自己实现的方式可能会存在很多细节问题,有空可以把这个细节深入研究一下)

工人(worker)

保存了所有在线程池中的工作线程的集合

private final HashSet workers = new HashSet(); 该声明并未使用线程安全的Set类,而是使用了最为简单的HashSet,因此其内部要求,任何对该集合的访问都需要获取private final ReentrantLock mainLock = new ReentrantLock();的锁权限。

Worker类属于ThreadPoolExecutor的私有内部类,因此只有ThreadPoolExecutor能够创建该类的实例,作为内部类,该类的实例能够直接访问ThreadPoolExecutor的属性与方法。

Woker源码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //通过ThreadPoolExecutor内的ThreadFactory生成的执行线程 //woker持有该thread,主要是为了保证可以获取到是哪个线程是该woker的运行线程 final Thread thread; //该woker的第一个任务,可能为null Runnable firstTask; //该woker完成的任务总量 volatile long completedTasks;

//AQS中state标志位,0:无锁状态,1:被持有锁状态,>=0:可被中断状态
Worker(Runnable firstTask) {
    setState(-1); // 禁止中断该worker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//worker也是runnable,将worker作为其执行线程的target,当其执行线程start()时,JVM会调用worker.run()
}
 
//该方法表明woker是个Runnable,当其执行线程start时,会调用该方法
public void run() {
    runWorker(this);//将自己作为参数,运行自己
}
 
protected boolean isHeldExclusively() {
    return getState() != 0;
}
 
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
 
protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}
 
public void lock() {
    acquire(1);
}
 
public boolean tryLock() {
    return tryAcquire(1);
}
 
public void unlock() {
    release(1);
}
 
public boolean isLocked() {
    return isHeldExclusively();
}
 
//仅当woker线程运行时,允许中断
void interruptIfStarted() {
    Thread t;
    //getState() >= 0 通过判断state是否大于0,初始情况下state=-1
    //当state < 0,即state = -1时,其执行线程尚未start,因此无需中断其执行线程
    //并不要求获取了woker的锁
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();//中断执行线程
        } catch (SecurityException ignore) {
        }
    }
}

} ThreadPoolExecutor.runWoker()

final void runWorker(ThreadPoolExecutor.Worker w) { //runWoker()方法只会被Woker.run()调用,而Woker.run()只会在其执行线程start后由jvm调用,因此runWoker()必定只会被对应Worker的执行线程调用 Thread wt = Thread.currentThread();//所以这里当前线程就是w.thread,也就是woker的执行线程 Runnable task = w.firstTask; w.firstTask = null; w.unlock();//此时woker的执行线程已经启动,允许外部进行中断,因此将woker的state置为0 boolean completedAbruptly = true; try { //getTask()重点,内部其实是从任务队列里获取任务,包含了线程阻塞、挂起、淘汰等一系列逻辑 //正常情况下就是该线程不断从任务队列里取任务 while (task != null || (task = getTask()) != null) { //这里的这个lock很有意思 //如果仅仅是执行线程执行任务的话,其实执行线程获取woker锁并无太大意义,一方面是因为该方法(runWoker(…))只会运行在执行线程中,不可能有并发情况,另一方面woker本身没有可以共享的资源(这个地方可以再考虑一下:获取的任务、本身执行的线程是否算可共享资源呢),没必要获取这个锁,但这个地方有个隐藏逻辑,就是一旦woker在执行任务,则woker必定是被其执行线程锁住的,因此通过woker锁的状态可以判断woker是否在执行任务。当外部线程想要让worker正常执行完任务,然后再停止woker时,就必须获取该woker的锁,如此就能够保证woker当前正在执行的任务被正常完成 w.lock(); //简短的代码,逻辑复杂,具体分析参见“Worker检测ThreadPoolExecutor是否Stoping及线程状态机制” if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//单个任务执行之前的钩子函数 Throwable thrown = null; try { task.run();//真正的执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//任务执行之后的钩子函数 } } finally { task = null; w.completedTasks++;//完成任务计数 w.unlock();//释放锁,表明当前worker已经完成某个任务的执行 } } completedAbruptly = false;//表明线程正常执行完任务,如果为true,则说明执行形成可能中途被中断,或是用户任务发生了异常 } finally { /* *处理将要结束的worker的收尾工作。 * 1、将当前worker从workerSet中移除 * 2、尝试将线程池过度为Terminated状态 * 3、在线程池仍然需要执行任务的状态下(RUNING、SHUTDOWN),判断是否需要添加新的worker至线程池,添加条件为:1)当前worker为突然终止;2)当前线程池的线程数量小于最小需求线程数量 * 4、走到这行代码一般有两种场景:1)无任务可做,且不会有新任务来了;2)用户任务执行期间发生了异常 */ processWorkerExit(w, completedAbruptly); } } Worker检测ThreadPoolExecutor是否Stoping及线程状态机制

1、如果ThreadPool处于Stop、Tidying或Terminated,且当前线程未被中断,则中断当前线程;2、如果处理Runing、Shutdown状态,则清除当前中断标志位,返回之前的中断标志,如果之前是中断的,且ThreadPool状态变为了Stop、Tidying或Terminated,则再次中断执行线程;3、这段逻辑保证了:如果当前ThreadPoolExecutor处于RUNING、SHUTDOWN,则中断标志位被清除;否则,直接中断执行线程。4、进一步思考:如果不这样实现会有什么问题?

下图为实际的判断逻辑,如果没有红框部分逻辑,则之前的清除中断标志位可能会导致在箭头处插入的ShutdownNow事件被忽略。

处理woker结束的工作

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted //正常结束不需要再去减少workerCount,原因是在getTask()的地方已经预先减过了 decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
} finally {
    mainLock.unlock();
}
 
tryTerminate();
 
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}

} 线程池里的线程创建与销毁

失效场景:1、空闲worker(当前worker数量超过corePoolSize,且线程keepAliveTime时间内未获取到任务,这也是线程池控制worker超时失效的机制);2、用户程序异常;3、任务完全处理完成

线程池在处理失效线程时,如果仍然需要增加线程,那么会再次通过ThreadFactory创建一个新的线程。

假设如下场景:线程池持续接收新任务,如果新的任务正常执行,那么执行该任务的线程还会继续服役,处理后续的新任务,但是如果新的任务无法正常执行,抛出了异常,那么该线程将由于该异常而而终结,线程池会创建新的线程继续处理后续任务。所以当所有的新任务都抛出异常时,可能会导致线程池单个线程的寿命极短,频繁创建线程,事实上导致线程池失效。

实际上呢?

通过submit()提交给线程池的RunnableTask都被包装为FutureTask了,而FutureTask在run()的时候,对异常进行包装,将outCome输出为Exception,也就是说正常的用户异常根本不会抛出到Worker的runWoker loop里…

但是如果直接调用ThreadPoolExecutor.execute()方法,则不会使用FutureTask包装该Runnable了,包装的过程都是在AbstractExecutorService里完成的,直接通过execute执行任务的话,则会产生之前所述的场景,线程将频繁创建

ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(5, new NamedThreadFactory()); while (true) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } executorService.execute(() -> {//使用Executor.execute()执行,则会出现频繁创建线程池的情况,使用submit()则不会,但其实execute与submit都是异步执行 throw new RuntimeException(); }); } 6、总结 线程池的核心其实是一个基于BlockingQueue的任务生产消费模型,任务的生产方为ThreadPool的使用方,通过submit与execute生产Runnable任务,任务的消费方则为ThreadPool内部的Worker。TheadPool做了很多关于线程池特性的控制:比如核心线程池大小、最大线程池大小、空闲线程的最大存活时间等等,主要就是通过Worker从BlockingQueue中获取任务的状态来控制Worker的销毁。

源码十分精巧,部分代码写的相当简练,但包含了很深的关于状态定义、任务生产、任务消费、超时控制、同步控制、空闲线程销毁、新增线程等一系列特性的实现逻辑,每一步的动作都值得进一步思考。

如果让自己去实现一遍这样的线程池,该如何去实现呢?

最后附上向ThreadPoolExecutor submit任务的执行流程图~