Java 线程池 ThreadPoolExecutor 详解

说到 Java 多线程就联想到 Thread、Runnable 等常用类,但是在实际项目中为了避免难以控制的线程创建,往往很少直接创建线程,都是创建线程池,提交任务到线程池执行。这点在阿里 Java 开发手册里也有提到。

线程池 ThreadPoolExecutor 类位于 Java 并发包下,为我们提供了管理线程池、任务提交、调度等功能。

ThreadPoolExecutor 创建

ThreadPoolExecutor 类提供了如下几个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 使用默认的线程创建方式和默认的拒绝策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}

// 使用默认的拒绝策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}

// 使用默认的线程创建方式。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}

最终调用的还是参数最多的这个构造方法:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)

其中各参数含义是:

  • corePoolSize

    核心线程数

  • maximumPoolSize

    最大线程数

  • keepAliveTime

    当线程池中线程数超过核心线程数时,空闲线程的存活时间。

  • unit

    上述空闲时间的单位,可选:填、小时、分、秒。

  • workQueue

    当线程池中线程超过核心线程数,再往线程池中提交任务,会存到该任务队列中。

  • threadFactory

    线程创建工厂,默认使用的是:DefaultThreadFactory

  • handler

    当线程池中线程数超过最大线程数时,会触发该拒绝策略,默认是:AbortPolicy,直接抛异常。

线程管理机制

可能有些人比较容易混淆 corePoolSizemaximumPoolSizeworkQueue 之间的关系。这就涉及到线程池的线程管理机制。

可以通过如下流程图理解线程管理逻辑:

以一个简单例子验证上述流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class App {

static Logger logger = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {
// 核心线程数:3,最大线程数:6,队列长度:2。
ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warn("Reject job: {}", ((MyThread)r).getIndex());
}
});

// 循环提交 10 个任务。
for (int i = 0; i < 10; i++) {
executorService.execute(new MyThread(i));
try {Thread.sleep(20);} catch (InterruptedException e) {}
}

// 任务还在执行,查看线程池大小。
logger.info("Pool size: {}", executorService.getPoolSize());

// 一定时间过后,查看线程池空闲线程释放情况。
try {Thread.sleep(10000);} catch (InterruptedException e) {}
logger.info("Pool size: {}", executorService.getPoolSize());
}
}

// 测试线程
class MyThread implements Runnable {

Logger logger = LoggerFactory.getLogger(this.getClass());

private final int index;

public MyThread(int index) {
this.index = index;
}

public int getIndex() {
return index;
}

@Override
public void run() {
logger.info("Begin job >>> {}", index);
try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {}
logger.info("End job <<< {}", index);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
16:45:54.188 [pool-1-thread-1] INFO MyThread - Begin job >>> 0
16:45:54.205 [pool-1-thread-2] INFO MyThread - Begin job >>> 1
16:45:54.225 [pool-1-thread-3] INFO MyThread - Begin job >>> 2
16:45:54.286 [pool-1-thread-4] INFO MyThread - Begin job >>> 5
16:45:54.306 [pool-1-thread-5] INFO MyThread - Begin job >>> 6
16:45:54.325 [pool-1-thread-6] INFO MyThread - Begin job >>> 7
16:45:54.345 [main] WARN App - Reject job: 8
16:45:54.365 [main] WARN App - Reject job: 9
16:45:54.385 [main] INFO App - Pool size: 6
16:45:56.194 [pool-1-thread-1] INFO MyThread - End job <<< 0
16:45:56.194 [pool-1-thread-1] INFO MyThread - Begin job >>> 3
16:45:56.205 [pool-1-thread-2] INFO MyThread - End job <<< 1
16:45:56.205 [pool-1-thread-2] INFO MyThread - Begin job >>> 4
16:45:56.225 [pool-1-thread-3] INFO MyThread - End job <<< 2
16:45:56.287 [pool-1-thread-4] INFO MyThread - End job <<< 5
16:45:56.306 [pool-1-thread-5] INFO MyThread - End job <<< 6
16:45:56.326 [pool-1-thread-6] INFO MyThread - End job <<< 7
16:45:58.196 [pool-1-thread-1] INFO MyThread - End job <<< 3
16:45:58.206 [pool-1-thread-2] INFO MyThread - End job <<< 4
16:46:04.389 [main] INFO App - Pool size: 3

可以看出:

  1. 最开始执行了 6 个线程的任务,因为最大线程数就是 6。

  2. 然后再提交的 8、9 号任务被拒绝了。

  3. 待 0 号线程执行完后,开始执行队列中的 3 号任务,然后 1 号线程执行完后,开始执行队列中的 4 号任务。

  4. 等待一段时间后,线程池中的线程数又缩小到了 3 个。

与上述流程图一致。

Executors

Executors类提供了几种创建特定线程的静态方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 固定线程数的线程池,加上无限制的线程队列。
public static ExecutorService newFixedThreadPool(int paramInt) {
return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

// 只有一个线程的线程池,加上无限制的线程队列。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

// Cached 的线程池,核心线程数为零,最大线程数无限大,加上无限制的线程队列。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

阿里开发手册提示:

线程池不允许使用 Executors 去创建,而应通过 ThreadPoolExecutor 的方式,这样的处理方式让写同学更加明确线程池运行规则,避资源耗尽风险。

其实从上面举的几个创建线程的方法可以看出,无限制队列、最大线程数无线大等都有比较特殊的使用场景,还是使用 ThreadPoolExecutor 创建符合自身项目的线程池吧!

参考文章

https://segmentfault.com/a/1190000015368896

https://www.cnblogs.com/zedosu/p/6665306.html

https://www.cnblogs.com/wxd0108/p/5479442.html

https://www.runoob.com/java/java-multithreading.html