JUC线程池
线程池分为两部分,一个是传统的ThreadPoolExecutor
以及分治使用的ForkJoinPool
,接口如下:

核心接口是Executor
,这里涉及到Runnable
,Future
,Callable
,FutureTask
的使用
1 | //-------------所有的submit最终都是封装RunableFutre,之后调用execute---------------- |
- 一般使用submit是为了获取线程执行的结果
- callable直接调用
1
2
3
4
5
6
7public void test() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(()->{
return 1;
})
Assertion.asserEqual(future.get(),1);
} - FutureTask使用,该接口是
Future
和Runnable
的接口,额外提供了判断线程是否执行结束的函数1
2
3
4
5
6public void test() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
FutureTask futureTask = new FutureTask(()->1);
executorService.execute(futureTask);
Assertions.assertEquals(futureTask.get(),1);
}
- callable直接调用
¶ThreadPoolExecutor实现
¶数据结构
该线程池内部维护一个worker
工作线程集合,用来消耗由Runnable
组成的阻塞队列workQueue
,并且将worker
分为核心线程和非核心线程,前者在从workQueue
中取任务时会调用take()
,后者会调用poll()
,当任意时刻没有获取任务,则工作线程退出,即remove一个worker
,明显能看出来核心线程正常情况会阻塞,非核心线程才会退出.
1 |
|
¶核心逻辑
- execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 当线程池正常且woker数量小于核心数量,则尝试向wokers中添加一个
* 含有初始任务的worker,添加函数可能会失败,由于多线程操作,也许线程池
* 状态会改变,woker数量也会改变.
*
* 2. 如1未通过,则尝试将command加入任务队列,若入队成功,则再次判断线程
* 池状态,若不正常则出队,并拒绝该任务;若未出队,且工作线程为空,则创建一个
* 不带初始任务的非工作线程
*
*
* 3. 若2未通过,即无法入队,则创建一个含有初始任务的非工作线程,否则拒绝此任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //注意此处使用的时offer,不是put,因此不会阻塞
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
} - addWorker:添加工作线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63private boolean addWorker(Runnable firstTask, boolean core) {
//1. 判断能否加入到worker中,核心线程去和corePoolSize比较,非核心和maximumPoolSize比较,
//for循环是为了处理clt的cas操作
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//2.添加worker,并启动
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加锁
try {
int c = ctl.get();
//再次检查线程池状态,成功则加入worker到set中
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//记录当前最大工作线程数量
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //启动Worker#thread.start()
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- worker:内部工作线程原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class ThreadPoolExecutor extends AbstractExecutorService {
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//内部真正执行的线程,实际只是执行Worker#run函数
final Thread thread;
//初始任务
Runnable firstTask;
//该工作线程完成了多少次任务
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 注意此处将AQS的资源数设置为-1
this.firstTask = firstTask; //设置初次任务
this.thread = getThreadFactory().newThread(this);//创建执行线程
}
//下边的逻辑和重入锁的互斥逻辑类似
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
} - runWorker:真正调用逻辑
当一个新的worker被加入集合后,调用内部Thread.start后会正式执行该函数
1 | final void runWorker(Worker w) { |
Worker
实现AQS的原因,以及内部不使用重入锁的原因
1 | private void interruptIdleWorkers(boolean onlyOne) { |
¶线程池退出
- shutdown
1 | public void shutdown() { |
¶线程池状态说明
- RUNNING:正常
- SHUTDOWN:该状态下,无法添加新的任务,但是可以添加新的工作线程(即不能添加还有初始任务的工作线程);可以继续处理任务
- STOP:该状态,无法添加工作线程;也不能继续处理任务
1 | //-------------addWorker--------------- |
¶ThreadPoolExecutor的三种常见使用
使用Executors.newxx
这个api除了newWorkStealingPool
和newScheduledThreadPool
实际上就是创建参数不同的ThreadPollExecutor
,构造器如下
1 | public ThreadPoolExecutor(int corePoolSize, |
- threadFactory的作用是用来创建Thread,该thread会调用Woker中的run函数从而执行runWork,假设需要获取线程异常信息,可以通过重写一个threadFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31//这是Exectuors中默认实现之一,并没有给线程增加异常处理
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
//可以加上异常处理
t.setUncaughtExceptionHandler(()->{
//捕捉异常,将信息输出,或者保存下来
})
return t;
}
} - newCachedThreadPool:适用于任务耗时非常短的场景
1
2
3
4
5
6public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
corePoolSize
为0,keepAliveTime
为60秒,blockQueue
为SynchronousQueue
,使用过程中不会创建核心线程,并且任务队列最多只能由一个任务,若任务队列被阻塞,则会伴随着非核心线程的创建,每个非核心线程的阻塞等待时间是60s,适用场景就是不确定线程数量,每个任务非常短的情况使用
- newFixedThreadPool:固定工作线程
1
2
3
4
5
6public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
corePoolSize
和maximumPollSize
相同,阻塞队列为无界阻塞队列LinkedBlockingQueue
,不会创建非核心线程,并且核心线程正常情况不会退出,任务队列是无界的,适用场景是用户要确定任务量,和线程数量
- newSingleThreadExecutor:单一线程
1
2
3
4
5
6
7public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
仅仅创建一个核心线程,任务队列采用无界阻塞队列,当然当该线程退出(异常),会有新的核心工作线程来替代它,适用于执行顺序任务.
¶ScheduledExecutorService
JUC中实现的用来延迟执行和定时执行的线程池
1 | //延迟执行,无结果 |
简单使用:
1 |
|