ThreadPoolExcecutor工作原理

前言

通常我们学习一门技术,首先是学会怎么使用它。然后再深入的学习其原理和思想。
线程池相信做并发编程的小伙伴们已经用的不能再多了。但对于其原理或者线程池配置的策略,大家心里有谱么?
本文的目的在于深入了解线程池及了解线程池配置的各种策略。


知识回顾

这之前我们先从概念上回顾下一些基础知识:

进程和线程的联系和区别

  • 进程是系统进行资源分配和调度的一个独立单位
  • 线程是进程的一个实体,是CPU调度和分派的基本单位

  • 进程独占内存资源

  • 线程拥有自己独立的栈空间和执行序列
  • 同属一个进程的线程共享进程的内存资源

总结:
多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。
但操作系统并不将这些线程看作为多个应用,来实现进程的调度和资源管理及分配。这是进程和线程的一大重要区别


为何使用多线程?为何使用线程池?

我们用一个假设的案例来类比下:

案例:一个商店

  • 方案A:配备1名专业收银员
  • 方案B:每有一个客户购买商品收银。老板就去找一名零时工来收银
  • 方案C:配备10名固定的收银员。旺季生意繁忙:则10名收银员一起劳作;淡季生意冷清:10名收银员中6人带薪休假,另外4名继续服务客户;
  • 方案D:老板底下有很多员工,平时各自有自己的事。商店有客户要收银了。老板安排一名员工去收银,若忙不过来,则多安排一些员工去收银。若一段时间没有客户了。部分员工可以回去做原来的事

我们这样理解:

  • 案例A:只有一个线程串行处理
  • 案例B:每有一个请求即新创建一个线程处理的线程池
  • 案例C:固定大小且永不休眠的线程池
  • 案例D:根据实际请求数目动态管理的线程池

每一种案例都是一种线程策略。针对不同的应用场景,我们可以选用不同的策略。

引申到我们的java系统:

  • Web服务器、数据库服务器等服务应用面对的情况是
    • :单个任务的处理时间很短,而请求的数目确实巨大的
      最简单的模型是:每当一个请求到达即创建一个新的线程,然后在新线程中为请求服务
    • 严重不足的是:若访问量大情况下。每到达一个请求即创建一个新的线程。创建销毁线程所花费的时间和资源 会比 实际处理请求花费的时间及资源要多
    • 除此之外活动的线程也是占用系统资源的,系统不可能创建太多的线程

因此我们需要线程池为线程生命周期开销问题和资源不足问题提供解决方案


ThreadPoolExecutor

接下来我们进入本文的正题

ThreadPoolExecutor基本介绍

关键字

  • JDK1.5
  • Java.util.concurrent

如何创建一个线程池

  • 构造方法
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • 参数释义
    • corePoolSize : 线程池维护线程的最少数量 (core : 核心)
    • maximumPoolSize : 线程池维护线程的最大数量
    • keepAliveTime : 线程池维护线程所允许的空闲时间
    • unit : 线程池维护线程所允许的空闲时间的单位
    • workQueue : 线程池所使用的缓冲队列
    • handler : 线程池对拒绝任务的处理策略 四种预置的策略;默认策略

ThreadPoolExecutor线程管理机制

我们用实际的举例来理解:

我们通过ThreadPoolExecutor构造了一个线程池:

  • corePoolSize : 10
  • maximumPoolSize : 30
  • workQueue : size 30 的有界队列

首先我们假设一个请求需要处理很长时间,那么:

  • 当请求数 <= 10 时,每发起一个请求,则创建一个核心线程接收处理请求(即使当前有空闲线程)
  • 当请求数 >10 & <40 时。此时核心线程池的10个线程正在被占用。会将新发起的先放到队列中,等待线程池进行任务调度。
  • 当请求数 = 40 时。即workQueue已满,且maximumPoolSize>corePoolSize,新提交任务会创建新线程执行任务
  • 当请求数 >40时。即当提交任务数超过maximumPoolSize,新提交任务由RejectedExecutionHandler处理

用总结性的语句来描述,即:

  • 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  • 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  • 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
  • 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理

大家也可以通过看 ThreadPoolExecutor.execute(Runnable command) 的源码来理解这幅图示的含义。

另:

  • 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
  • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

上面这个例子,大家可以通过下述java代码执行结果验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----10 maximumPoolSize 最大线程池大小----30 keepAliveTime
* 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit TimeUnit
* keepAliveTime时间单位----TimeUnit.MINUTES workQueue 阻塞队列----new
* ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列 threadFactory 新建线程工厂----new
* CustomThreadFactory()====定制的线程工厂 rejectedExecutionHandler
* 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(10, 30, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
public void destory() {
if (pool != null) {
pool.shutdown();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录异常
// 报警处理等
System.out.println("error.............");
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for (int i = 1; i < 100; i++) {
System.out.println("提交第" + i + "个任务!");
final int index = i;
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(index + " task run end =====");
}
});
}
exec.destory();
}
}

Executors提供的4种线程池配置策略

  • 固定线程数目的线程池

  • 无限线程池:

  • 单线程线程池

  • 定时功能线程池

以上策略的线程池可通过下述java代码执行观察其行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
// 创建一个可重用固定线程数的线程池
// ExecutorService pool = Executors.newFixedThreadPool(5);
// 创建一个单任务线程池
// ExecutorService pool = Executors.newSingleThreadExecutor();
// 创建一个可变尺寸的线程池
// ExecutorService pool = Executors.newCachedThreadPool();
// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 100; i++) {
// pool.execute(new MyThread(i + 1));
pool.schedule(new MyThread(i + 1), i * (i + 1) / 2, TimeUnit.SECONDS);
}
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
private int taskId;
public MyThread(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "正在执行任务,任务编号:" + taskId + ", current time is: "
+ new Date().getMinutes() + ":" + new Date().getSeconds());
}
}

线程池配置知识延伸

ThreadPoolExecutor提供的四种拒绝策略即RejectedExecutionHandler

  • CallerRunsPolicy

    线程池拒绝执行,通知调用方让调用方自己执行

  • AbortPolicy

    线程池拒绝执行,并且抛出一个异常让外部知道

  • DiscardPolicy

    线程池拒绝执行,并且什么也不做

  • DiscardOldestPolicy

    线程池将队列中的最老的任务丢弃掉,将新发起的任务假如到队列的尾端

BlockingQueue阻塞队列

上述图中:抛异常、特定值、超时 列 均为 Queue 需要实现的接口
行为正如名称:

  • 抛异常:执行非预期,抛异常
  • 特定值:执行非预期,返回null
  • 超时:根据设定的超时时间,超过超时间仍不能按照预期执行,抛异常

而,BlockQueue在实现Queue接口的基础上,新增实现了阻塞那一列的方法。

  • put(o) : 如果队列满了,调用这个方法,方法会一直尝试直到能够将参数o成功放到队列中
  • take(o) : 如果队列中没有任何元素,调用这个方法,方法会一直尝试直到能够从队列中取出一个元素

附上java原生实现的一些阻塞队列:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:一个不存储元素的阻塞队列
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

实现自己定制的阻塞队列

附上java源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomBThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----1 maximumPoolSize 最大线程池大小----3 keepAliveTime
* 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit TimeUnit
* keepAliveTime时间单位----TimeUnit.MINUTES workQueue 阻塞队列----new
* ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列 threadFactory 新建线程工厂----new
* CustomThreadFactory()====定制的线程工厂 rejectedExecutionHandler
* 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(1, 3, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
public void destory() {
if (pool != null) {
pool.shutdown();
}
}
public ExecutorService getCustomBThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomBThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomBThreadPoolExecutor exec = new CustomBThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomBThreadPoolExecutor();
for (int i = 1; i < 100; i++) {
System.out.println("提交第" + i + "个任务!");
final int index = i;
pool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index + " task is running=====");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
exec.destory();
}
}

总结

  • 用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM
  • 如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
  • 最大线程数一般设为2N+1最好,N是CPU核数
  • 核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
  • 如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果

相信本文结束后,大家对线程池配置应该有了深刻的认识。

并发编程挺好玩的。希望大家在使用一门技术或工具的时候,能往里多想一层。多思考,多深入。

坚持原创技术分享,您的支持将鼓励我继续创作!