Fork/Join 框架

Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork/Join 框架需要理解两个点,「分而治之」「工作窃取算法」

  • Fork/Join 框架
    • 设计目的:Fork/Join 框架特别适用于可以递归地分割成更小子问题的任务,这些子问题最终会被合并以形成整个问题的解决方案。它主要用于实现分治算法。
    • 工作窃取机制:Fork/Join 框架采用了一种称为“工作窃取”(work-stealing)的调度策略。线程池中的空闲线程会尝试从其他忙碌线程的任务队列中窃取任务来执行,从而提高 CPU 的利用率和任务处理效率。
    • 核心类:主要依赖于 ForkJoinPoolRecursiveTask<V>RecursiveAction 这几个核心类。开发者通过继承 RecursiveTaskRecursiveAction 来定义可分割的任务,并调用 fork() 方法将任务提交到线程池中,或者使用 join() 方法等待任务完成并获取结果。
  • Executor 框架
    • 设计目的:Executor 框架是一个更通用的任务执行框架,旨在简化线程管理与任务调度,它将任务的提交与执行解耦,允许程序灵活配置任务的执行策略。
    • 线程池:Executor 框架通常与线程池一起使用,如 ThreadPoolExecutor 或者 ScheduledThreadPoolExecutor,提供了一种资源受限的方式执行异步任务。它可以控制最大并发数、任务排队策略等。
    • 接口层次结构:Executor 框架提供了 Executor 接口及其扩展 ExecutorService,后者增加了生命周期管理和批量任务提交等功能。Callable<V>Runnable 是两种类型的任务接口,分别对应有返回值和无返回值的任务。

Executor 框架

Executor 框架的依赖关系图:

400

Executor 接口

Executor 接口仅包含一个 void execute(Runnable command); 方法。

顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。

ExecutorService 接口

ExecutorService 接口对 Executor 接口进行了如下扩展,增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

400

  • awaitTermination 阻塞方法,等待线程池退出后才会结束自身。
  • invokeAll 进行批量提交
  • isShutdown 判断是否 shutdown
  • isTerminated 如果所有任务在关闭后都已完成,则返回 true。除非首先调用了 shutdown 或 shutdownNow,否则 isTerminated 永远不会为真。
  • shutdown 将线程池的状态设置为 SHUTWDOWN 状态,已经提交的任务会继续执行下去,均执行完后将限制线程中断。此时,不能添加新任务,否则抛出 RejectedExecutionException 异常。
  • shutdownNow 则是将线程池的状态设置为 STOP,正在执行的任务则被停止,没被执行任务的则返回。它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但是这种方法的作用有限,如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,ShutdownNow() 并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
  • submit 提交不同类型的任务

AbstractExecutorService 抽象类

实现了 ExecutorService 接口,实现了 invokeAllinvokeAnysubmit 方法。ThreadPoolExecutor 类直接继承了这些方法,没有重写方法。(后续再了解任务的执行过程的时候,直接看这里的源码)

400

ThreadPoolExecutor 类

继承了 AbstractExecutorService 抽象类,线程池的主要实现类。

400

ScheduledExecutorService 接口

周期性执行或延迟执行,后续再学习。

ScheduledThreadPoolExecutor 类

周期性执行或延迟执行,后续再学习。

ThreadPoolExecutor 类分析

构造函数

public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue,
						  ThreadFactory threadFactory,
						  RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}
  • corePoolSize 线程池的核心线程数量,定义了最小可以同时运行的线程数量。
  • maximumPoolSize 线程池的最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量变为最大线程数。
  • keepAliveTime 当线程数大于 corePoolSize 时,空闲的线程能存活的时间,超过就销毁线程。
  • unit 时间单位。
  • workQueue 任务队列,用来储存等待执行任务的队列。
  • threadFactory 线程工厂,用来创建线程,一般默认即可。
  • handler 拒绝策略,当任务过多时,该如何处理新任务。

我感觉,先了解线程池的状态 运行机制 状态相关逻辑 任务执行机制 线程执行机制,这个顺序比较合适。
否则直接看任务执行机制和线程执行机制,会被各种状态的逻辑判断整晕了。

线程池的状态/生命周期

ThreadPoolExecutor 类包含多个成员变量与相关函数,如下所示。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
 
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN =  0 << COUNT_BITS;
private static final int STOP =  1 << COUNT_BITS;
private static final int TIDYING =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
 
private static boolean runStateLessThan(int c, int s) { return c < s;}
private static boolean runStateAtLeast(int c, int s) { return c >= s;}
private static boolean isRunning(int c) { return c < SHUTDOWN;}

ctl 用来存储运行状态(runState)和线程数量 (workerCount)。其中,低 29 位用于存储线程数量,高 3 位用来存储运行状态。官方注释表示,若线程数目成为问题,则可以将 AtomicInteger 更改为 AtomicLong,并且调整后面的移位/掩码就可以。

对于 ctl 变量,有三个相对应的方法。runStateOf 计算当前运行状态,workerCountOf 计算当前线程数量,ctlOf 根据状态和线程数生成 ctl。

ThreadPoolExecutor 有 5 种运行状态:

  • RUNNING 能接受提交的任务,并且能够处理阻塞队列的任务。
  • SHUTDOWN 关闭状态,不再接受新提交的任务,但可以继续处理阻塞队列中已保存的任务。
  • STOP 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
  • TIDYING 所有任务都已终止,workerCount 有效线程数为 0.
  • TERMINATED 在 terminated 方法执行完后进入该状态。

这 5 种运行状态从上到下依次递增,叠加线程数量后,可以通过比较运算来判断处于的运行状态。

运行状态转换/生命周期如下:

ThreadPoolExecutor 运行机制

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。

线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

任务执行机制

任务分配 - submit/execute

RunnableCallable 对象通过 submit 方法传入时,会由 AbstractExecutorService 抽象类将输入转换为 RunnableFuture<?> 对象,然后调用 ThreadPoolExecutorexecute(Runnable) 方法执行。Runnable 对象也可以通过 execute 方法传入,则直接调用 ThreadPoolExecutorexecute(Runnable) 方法执行。

AbstractExecutorService 抽象类对于 submit 方法的实现。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
	 RunnableFuture<Void> ftask = newTaskFor(task, null);
	 execute(ftask);
	 return ftask;
}
 
 public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
	 RunnableFuture<T> ftask = newTaskFor(task, result);
	 execute(ftask);
	 return ftask;
}
 
 public <T> Future<T> submit(Callable<T> task) {
	 if (task == null) throw new NullPointerException();
	 RunnableFuture<T> ftask = newTaskFor(task);
	 execute(ftask);
	 return ftask;
}

ThreadPoolExecutor 类对 execute 方法的实现。

  1. 当前线程数小于 corePoolSize,尝试创建核心线程执行命令
  2. 如果线程池处于 RUNNING,尝试添加到阻塞队列中
    2.1. 由于竞态条件,因此需要再次判断是否不属于 RUNNING 状态,不属于,则尝试删除任务,删除成功则拒绝该任务
    2.2 否则,启动一个非核心线程,去从阻塞队列中拉取任务
  3. 阻塞队列已满,判断能否创建非核心线程,失败则拒绝该任务
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1. 当前线程数小于 corePoolSize,尝试创建核心线程执行命令
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2. 如果线程池处于RUNNING,尝试添加到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 2.1. 由于竞态条件,因此需要再次判断是否不属于RUNNING状态,
            // 不属于,则尝试删除任务,删除成功则拒绝该任务
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
			// 2.2 否则,启动一个非核心线程,去从阻塞队列中拉取任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3. 阻塞队列已满,判断能否创建非核心线程,失败则拒绝该任务
        else if (!addWorker(command, false))
            reject(command);
    }

大致执行流程如下:

任务缓冲 - 阻塞队列

ThreadPoolExecutor 使用阻塞队列来缓冲需要执行的任务,对应成员变量如下:
private final BlockingQueue<Runnable> workQueue;

execute 方法中,将 Runnable 对象添加进阻塞队列中,是唯一入口;在 getTask 方法中,从阻塞队列中取出一个 Runnable 对象,是唯一出口。

补充一个阻塞队列的各种实现类。

任务获取 - getTask

  1. 线程池状态的判断
    • 如果线程池状态>=SHUTDOWN, 并且 BlockingQueue 为 null; 或者线程池状态>=STOP。
    • 则根据设定,不能执行任务,返回 null,同时减少在线的线程数
  2. 线程数的判断 - 是否淘汰线程
    • (1)allowCoreThreadTimeOut 核心线程是否超时淘汰。超时条件 1
    • (2)线程数是否大于 corePoolSize。超时条件 2
    • (3)是否大于 maximumPoolSize。
    • (4)任务队列为空,或当前线程大于 1。
    • 在(4)满足的情况下,前两者在从阻塞队列中获取元素超时时,会进行淘汰;后者铁定淘汰。
    • (4)防止 corePoolSize 设置为 0,但是任务队列不为空的情况,没有线程去处理任务了。
    • 淘汰,则直接返回即可。
  3. 尝试从任务队列中获取任务
    • 若超时条件满足,则 poll 计时获取;否则 take 等待获取。
    • 没获取到,则更改获取元素超时状态变量,重新循环。
    • 获取到,直接返回。
private Runnable getTask() {
		// BlockingQueue的poll方法是否已经超时
        boolean timedOut = false;
        for (;;) {
            int c = ctl.get();
            // 1. 状态判断
			// 如果线程池状态>=SHUTDOWN,并且BlockingQueue为null;
	        // 或者线程池状态>=STOP
	        // 逐渐转换为 TIDYING 状态, 同时当前不能执行任务,返回null
            if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            // 2. 线程数的判断 - 超时淘汰线程
            int wc = workerCountOf(c);   // 获得当前工作线程个数
            // allowCoreThreadTimeOut false-核心线程不会被淘汰,true-核心线程可以淘汰
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;   // 当前线程是否可能被淘汰
            // 线程数多,要被淘汰  and 阻塞队列没有数据,进行淘汰
            if ((wc > maximumPoolSize || (timed && timedOut))  && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            // 3. 尝试从阻塞队列中获取任务
            try {
	            // BlockingQueue的poll方法超时会直接返回null
	            // BlockingQueue的take方法, 如果队列中没有元素, 当前线程会wait, 直到其他线程提交任务入队唤醒当前线程.
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

任务拒绝 - 拒绝策略

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

拒绝策略是一个接口,其设计如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现这个接口去定制拒绝策略,也可以选择 JDK 提供的四种已有拒绝策略,其特点如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
	public CallerRunsPolicy() { }
	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) { r.run(); }
	}
}
public static class AbortPolicy implements RejectedExecutionHandler {
	public AbortPolicy() { }
	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
	}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
	public DiscardPolicy() { }
	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
 
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
	public DiscardOldestPolicy() { }
	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) {
			e.getQueue().poll();
			e.execute(r);
		}
	}
}

线程执行机制

工作线程的基本信息

线程池为了更好的掌握线程的状态并维护线程的生命周期,设计了 Worker 类作为工作线程对象。并使用 private final HashSet<Worker> workers 来管理工作线程的生命周期。当创建新工作线程时,将新 Worker 添加到 workers 中;当需要销毁工作线程时,从 workers 中将对应 Worker 删除掉。期间会有很多设计到线程池状态的判断。

(AQS 还不会,这部分稍后再看)
Worker 是通过继承 AQS,使用 AQS 来实现独占锁这个功能。没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock 方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

Worker 类的依赖关系与类信息等如下:

400

400

工作线程执行任务的整个过程:

工作线程的属性与构造函数

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	final Thread thread;
	Runnable firstTask;
	volatile long completedTasks;
	Worker(Runnable firstTask) {
		setState(-1); // inhibit interrupts until  // runWorker 调用AQS的方法,还不了解
		this.firstTask = firstTask;
		this.thread = getThreadFactory().newThread(this);
	}
}
  • final Thread thread Worker 持有的线程,在创建 Worker 时通过 ThreadFactory 线程工厂来创建线程。
  • Runnable firstTask 初始化的任务,可以为 null。
  • volatile long completedTasks 每个线程完成的任务数。

工作线程申请 - addWorker

申请工作线程是通过 addWorker 方法来执行的。firstTask 参数表示指定新增线程要执行的第一个任务,参数可以为空,为空直接返回 falsecore 参数,true 时判断当前活动线程数是否小于 corePoolSizefalse 时判断是否小于 maximumPoolSize

1 申请添加工作线程
1.1 线程池状态判断
至少 STOP 状态,不能创建工作线程,直接返回 false
SHUTDOWN 状态,只能创建从阻塞队列中读取任务的工作线程,因此 firstTask 不为空或任务队列为空,直接返回 false
1.2 线程数状态判断
当前的工作线程数目不小于 corePoolSize 或 maximumPoolSize 规定的数目,则无法创建,直接返回 false
1.2.1 CAS 机制,让 c.get 自增 1,即尝试增加一个工作线程,成功则跳出循环

  这里没有对 ctl 进行自增 1,原因在于通过 CAS 机制判断期间状态或线程数有没有改变

1.2.2 CAS 失败了,说明 ctl 被更新,重新获取 ctl 值
1.2.3 如果运行状态至少为 SHUTDOWN,重新线程池状态的判断
1.2.4 否则,重新线程数状态判断

2 启动创建的工作线程
2.1 创建工作线程及对应 Thread 对象成功
2.1.1 重新判断线程池状态
如果是 RUNNING 状态, 或者是 SHUTDOWN 状态并且传入的 task 为 null,则继续;否则,啥也不做了,等着被回收。
2.1.1.1 添加到线程池 hashset 中
2.2 创建成功,则启动线程
2.3 启动失败,调用 addWorkerFailed 方法,移除失败线程

private boolean addWorker(Runnable firstTask, boolean core) {
	// 1. ⭐⭐ 申请添加工作线程
	retry:
	for (int c = ctl.get();;) {
		// 1.1 ⭐线程池状态判断
		// STOP 状态,不能创建工作线程,直接返回false
		// SHUTDOWN 状态,只能创建从阻塞队列中读取任务的工作线程,因此firstTask不为空或任务队列为空,直接返回false
		if (runStateAtLeast(c, SHUTDOWN)
			&& (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
			return false;
 
		for (;;) {
			// 1.2 ⭐线程数判断
			// 当前的工作线程数目不小于corePoolSize或maximumPoolSize规定的数目
			// 则无法创建新线程,返回false
			if (workerCountOf(c)
				>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
				return false;
			// 1.2.1  CAS 机制,让线程数自增1,即尝试增加一个工作线程
			// 这里没有对ctl进行自增1,原因在于通过CAS机制判断期间状态或线程数有没有改变
			if (compareAndIncrementWorkerCount(c))
				break retry;
			// 1.2.2 CAS 失败了,说明ctl被更新,重新获取ctl值
			c = ctl.get();
			// 1.2.3 如果运行状态至少为SHUTDOWN,那么重新线程池状态的判断
			if (runStateAtLeast(c, SHUTDOWN))
				continue retry;
			// 1.2.4 运行状态仍然是RUNNING状态,重试内部循环,尝试添加工作线程
		}
	}
 
	// 2.  ⭐⭐ 启动创建的工作线程
	// 两个状态,一个是工作线程添加成功状态workerAdded
	// 一个是工作线程启动工作状态workerStarted
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		w = new Worker(firstTask); // 创建工作线程
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock(); // 加锁
			try {
				// Recheck while holding lock.
				// Back out on ThreadFactory failure or if
				// shut down before lock acquired.
				int c = ctl.get(); // 加锁后,重新获得状态
				// 判断线程池状态
				// 如果是RUNNING状态, 或者是SHUTDOWN状态并且传入的task为null,则继续
				if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
					// 线程已经被启动, 抛出IllegalThreadStateException
					if (t.getState() != Thread.State.NEW)
						throw new IllegalThreadStateException();
					// 将worker对象添加到HashSet,并更新workerAdded状态
					workers.add(w);
					workerAdded = true;
					// 线程池中曾经达到的最大线程数
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
				}
			} finally {
				mainLock.unlock(); // 释放锁
			}
			// 如果worker被添加成功, 启动线程, 执行对应的task,并更新workerStarted状态
			if (workerAdded) {
				t.start();
				workerStarted = true;
			}
		}
	} finally {
		// 如果线程启动失败, 执行addWorkerFailed方法
		if (! workerStarted)
			addWorkerFailed(w);
	}
	return workerStarted;
}

工作线程回收 - processWorkerExit

线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。如下述代码所示。

// runWorker 方法
try {
  while (task != null || (task = getTask()) != null) {
    //执行任务
  }
} finally {
  processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}

线程回收的工作是在 processWorkerExit 方法完成的。

图10 线程销毁流程

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
 
        tryTerminate();
 
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

工作线程执行任务 - runWorker

  1. while 循环,判断是否有可执行任务,有则加锁判断
  2. 线程池状态判断
  3. beforeExecute,执行,afterExecute。

这里涉及到加锁的问题,之后学了 AQS 再来回顾下。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
	        // 当传入的task不为null, 或者task为null但是从BlockingQueue中获取的task不为null
            while (task != null || (task = getTask()) != null) {
                w.lock();  // 执行任务之前先获取锁
                // 线程池状态如果为STOP, 或者当前线程是被中断并且线程池是STOP状态,
                // 或者当前线程不是被中断;
                // 则调用interrupt方法中断当前线程
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
	                // beforeExecute hook方法
                    beforeExecute(wt, task);
                    try {
	                    // 真正执行提交的task的run方法
                        task.run();
                        // afterExecute hook方法
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
	                // task赋值为null, 下次从BlockingQueue中获取task
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

线程池状态相关逻辑

shutdown 方法 - SHUTDOWN 状态

调用 shutdown 方法后:

  1. 设置为 SHUTDOWN 状态
  2. 中断空闲的线程(等待任务的线程)
  3. 提交任务调用 execute 方法。其中会对线程池的状态进行判断(isRunningaddWorkerrunStateAtLeast(c, SHUTDOWN)),最终执行 reject(Runnable) 方法,拒绝新任务。
  4. 对于已经提交的任务(正在被执行 or 阻塞队列中)没有任何影响,即正常运行到结束。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();                   // 权限校验,安全策略相关判断
        advanceRunState(SHUTDOWN);     // 设置SHUTDOWN状态
        interruptIdleWorkers();                       // 中断所有的空闲的工作线程
        onShutdown();                                        // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();                                            // 尝试进入 Terminate 状态
}
 
// 升提状态 assert targetState == SHUTDOWN || targetState == STOP;
// not TIDYING or TERMINATED -- use tryTerminate for that
private void advanceRunState(int targetState) {
    for (;;) {                                                           // 如果已经是了,直接结束;否则CAS自旋更改ctl值
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);               // Interrupts threads that might be waiting for tasks 中断空闲线程
}

shutdownNow 方法 - STOP 状态

调用 shutdownNow 方法后:

  1. 设置为 STOP 状态;
  2. 中断所有线程;
  3. 将阻塞队列中的任务都存储到返回列表中;
  4. 提交任务同样直接拒绝,原因 shutdown 相同。
public List<Runnable> shutdownNow() {
	List<Runnable> tasks;
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		checkShutdownAccess();
		advanceRunState(STOP);          // 设置STOP状态
		interruptWorkers();                      // 中断所有的线程
		tasks = drainQueue();                  // 将阻塞队列中的任务移动到新集合中
	} finally {
		mainLock.unlock();
	}
	tryTerminate();
	return tasks;
}
private void interruptWorkers() {   // Interrupts all threads, even if active.
	for (Worker w : workers)
		w.interruptIfStarted();
}
 
// Worker 的方法
void interruptIfStarted() {
	Thread t;
	// 如果状态不小于0,当线程获得task后会申请锁,状态+1,runWorker中,后续再看
	// 并且线程没有被打断,则尝试打断线程
	if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
		try {
			t.interrupt();
		} catch (SecurityException ignore) {
		}
	}
}
private List<Runnable> drainQueue() {                                  // 将阻塞队列中的任务移动到新集合中
	BlockingQueue<Runnable> q = workQueue;
	ArrayList<Runnable> taskList = new ArrayList<>();
	q.drainTo(taskList);
	if (!q.isEmpty()) {                                                                          // delay队列或其他移动失败的数据,一个个移动
		for (Runnable r : q.toArray(new Runnable[0])) {
			if (q.remove(r))
				taskList.add(r);
		}
	}
	return taskList;
}

tryTerminate 方法 - TIDYING 和 TERMINATED 状态

根据条件判断获得:

  1. RUNNING 或不止 TIDYING 状态,直接返回。
  2. 如果 SHUTDOWN 状态,并且阻塞队列不为空,则直接返回。
  3. 如果 STOP 状态或阻塞队列为空的 SHUTDOWN 状态,如果工作线程不为 0,则随机中断一个空闲线程。
  4. 当以上均满足,即 SHUTDOWN 状态时,阻塞队列为空&工作线程为 0,或 STOP 状态时,工作线程为 0,则申请锁,并使用 CAS 自旋的方式去更改为 TIDYING 状态。
  5. 之后执行 terminated 钩子方法。执行后,进入 TERMINATED 状态。
final void tryTerminate() {
    for (;;) {
        // 获取ctl, runState和workerCount
        int c = ctl.get();
        // 当前线程池状态是否是RUNNING, 或者是否是TIDYING或TERMINATED状态, 或者是否是SHUTDOWN状态并且workQueue不为空(需要被线程执行), return结束方法
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // workerCount如果不为0, 随机中断一个空闲的线程, return结束方法
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
 
        final ReentrantLock mainLock = this.mainLock;
        // 获取mainLock锁
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  // CAS方式设置当前线程池状态为TIDYING, workerCount为0
                try {
                    terminated();                                                         // 执行hook方法terminated
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));                      // 设置当前线程池状态为TERMINATED, workerCount为0
                    termination.signalAll();                                   // 唤醒调用了awaitTermination方法的线程
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // 当CAS失败, 循环重试
    }
}

如何合理配置线程池参数?

静态配置

自定义线程池就需要我们自己配置最大线程数 maximumPoolSize ,为了高效的并发运行,这时需要看我们的业务是 IO 密集型还是 CPU 密集型。

CPU 密集型 CPU 密集的意思是该任务需要最大的运算,而没有阻塞,CPU 一直全速运行。CPU 密集任务只有在真正的多核 CPU 上才能得到加速(通过多线程)。而在单核 CPU 上,无论你开几个模拟的多线程该任务都不可能得到加速,因为 CPU 总的运算能力就那么多。

IO 密集型 IO 密集型,即该任务需要大量的 IO,即大量的阻塞。在单线程上运行 IO 密集型的任务会导致大量的 CPU 运算能力浪费在等待。所以在 IO 密集型任务中使用多线程可以大大的加速程序运行,即使在单核 CPU 上这种加速主要就是利用了被浪费掉的阻塞时间。
IO 密集型时,大部分线程都阻塞,故需要多配制线程数。公式为:

CPU 核数*2
CPU 核数/(1-阻塞系数) 阻塞系数在 0.8~0.9 之间
查看 CPU 核数:
System.out.println(Runtime.getRuntime().availableProcessors());

🌈 拓展一下(参见:issue#1737):

线程数更严谨的计算的方法应该是:最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间)),其中  WT(线程等待时间)=线程运行总时间 - ST(线程计算时间)

线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。

我们可以通过 JDK 自带的工具 VisualVM 来查看  WT/ST  比例。

CPU 密集型任务的  WT/ST  接近或者等于 0,因此,线程数可以设置为 N(CPU 核心数)∗(1+0)= N,和我们上面说的 N(CPU 核心数)+1 差不多。

IO 密集型任务下,几乎全是线程等待时间,从理论上来说,你就可以将线程数设置为 2N(按道理来说,WT/ST 的结果应该比较大,这里选择 2N 的原因应该是为了避免创建过多线程吧)。

动态化线程池

当以上都不适用时,选用动态化线程池,看美团技术团队的实践:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:

  • corePoolSize :  核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize :  当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:  当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

美团的方式是自定义了一个叫做  ResizableCapacityLinkedBlockIngQueue  的队列(主要就是把 LinkedBlockingQueue 的 capacity 字段的 final 关键字修饰给去掉了,让它变为可变的)。

还没看够?推荐 why 神的如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。这篇文章,深度剖析,很不错哦!

如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:

  • Hippo4j:异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力。
  • Dynamic TP:轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)。

其他问题

核心线程是否回收

ThreadPoolExecutor  默认不会回收核心线程,即使它们已经空闲了。这是为了减少创建线程的开销,因为核心线程通常是要长期保持活跃的。但是,如果线程池是被用于周期性使用的场景,且频率不高(周期之间有明显的空闲时间),可以考虑将  allowCoreThreadTimeOut(boolean value)  方法的参数设置为  true,这样就会回收空闲(时间间隔由  keepAliveTime  指定)的核心线程了。

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 6, 6, TimeUnit.SECONDS, new SynchronousQueue<>());
threadPoolExecutor.allowCoreThreadTimeOut(true);

是否可以提前创建线程

答案是可以的!ThreadPoolExecutor  提供了两个方法帮助我们在提交任务之前,完成核心线程的创建,从而实现线程池预热的效果:

  • prestartCoreThread():启动一个线程,等待任务,如果已达到核心线程数,这个方法返回 false,否则返回 true;
  • prestartAllCoreThreads():启动所有的核心线程,并返回启动成功的核心线程数。

线程异常后,销毁还是复用

直接说结论,需要分两种情况:

  • 使用 execute() 提交任务:当任务通过 execute() 提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。
  • 使用 submit() 提交任务:对于通过 submit() 提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由 submit() 返回的 Future 对象中。当调用 Future.get() 方法时,可以捕获到一个 ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。

简单来说:使用 execute() 时,未捕获异常导致线程终止,线程池创建新线程替代;使用 submit() 时,异常被封装在 Future 中,线程继续复用。

这种设计允许 submit() 提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而 execute() 则适用于那些不需要关注执行结果的场景。

具体的源码分析可以参考这篇:线程池中线程异常后:销毁还是复用? - 京东技术
Future

如何给线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。默认情况下创建的线程名字类似  pool-1-thread-n  这样的,没有业务含义,不利于我们定位问题。

1、利用 guava 的  ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

2、自己实现  ThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {
 
    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;
 
    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }
 
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

Worker 内部类

Executors 工厂

常见的线程池

《阿里巴巴 Java 开发手册》指出,不能使用 Executors 工厂内置的创建线程池方法,要自行根据情况使用 ThreadPoolExecutor 的构造函数创建。(平时小 demo 问题不大)

FixedThreadPool

corePoolSize = maximumPoolSize, keepAliveTime = 0, 使用无界队列 LinkedBlockingQueue

public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
								  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
								  new LinkedBlockingQueue<Runnable>(), threadFactory);
}

FixedThreadPool 的问题:阻塞队列使用无参构造的 LinkedBlockingQueue,即选择了无界队列。那么正常运行时,不执行 shutdown()shutdownNow() 的情况下,不会出现任务拒绝,只会出现内存溢出(OOM)。

这里 maximumPoolSizekeepAliveTime 均无效,不会有多余的工作线程。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
	return new FinalizableDelegatedExecutorService
		(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
// 另一个略

SingleThreadExecutor 的问题:和 FixedThreadPool 一样,更容易会出现 OOM。

CachedThreadPool

corePoolSize = 0, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime+TimeUnit = 60s, 使用 SynchronousQueue 队列。

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
								  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
								  new SynchronousQueue<Runnable>(), threadFactory);
}

CachedThreadPool 的问题:当主线程提交任务的速度高与当前线程池处理任务的速度时,会不断创建新的工作线程,从而内存溢出(OOM)。

CachedThreadPool 的执行过程

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute() 方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

ScheduledThreadPoolExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

其他工厂函数

ForkJoinPool

public static ExecutorService newWorkStealingPool(int parallelism) {
	return new ForkJoinPool
		(parallelism,
		 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
		 null, true);
}
public static ExecutorService newWorkStealingPool() {
	return new ForkJoinPool
		(Runtime.getRuntime().availableProcessors(),
		 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
		 null, true);
}

其他工具方法

callable 方法

讲 Runnable 对象转换为 Callable 对象

public static <T> Callable<T> callable(Runnable task, T result) {
	if (task == null)
		throw new NullPointerException();
	return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
	if (task == null)
		throw new NullPointerException();
	return new RunnableAdapter<Object>(task, null);
}

区别比较

Runnable 接口和 Callable 接口的区别

  • Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。
  • 接口的实现函数不同,一个 run,一个 call

工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。( Executors.callable(Runnable task)Executors.callable(Runnable task, Object result) )。

public interface Runnable {
    public abstract void run();
}
public interface Callable<V> {
    V call() throws Exception;
}

execute 方法和 submit 方法的区别

  1. execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

shutdown() VS shutdownNow()

  • shutdown(): 关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow(): 关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。 shutdownNow 的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

Reference