JUC结构
JUC可以分为AQS锁实现
,线程池相关
,同步通信工具
,原子类
,并发集合
,可以参考,主要分析AQS锁实现,线程池,以及队列和并发集合实现.
¶锁
这里特指JUC中的Lock
类,定义如下:
1 | public interface Lock { |
Lock
类具有以下性质:
- 提供了更多操作,支持更加延展性的结构,通过可以关联多个Condition对象
- 一般来说是独占方式,但也可以支持共享,如ReadWriteLock
- sync通过每个对象的监视器实现,获取多个锁和释放多个锁顺序相反
- Lock实现了和Sync相同的语义,完成了可见性和原子性,前者是由于volatile本身的性质完成的
- Lock可以响应中断操作
¶Lock的可见性
在Lock的注释中描述了,Lock
具有和sync
相同的内存同步语义,以ReentrantLock
为例,简化内部源码,如下
1 | private volatile int state; |
假设有一个变量a,考虑它的可见性
1 | int a = 0; |
根据happend-before原则: volatile变量规则:可见性规则,该种变量的读取发生在写之后
,假设线程A,B同时获取锁,当A线程持有锁改变了a,并且释放锁,那么它的顺序就是 写a-->写state
,此时线程B被唤醒,在AQS中会再次执行tryAcqurrie()
,从而经历读state,读a
,由于volatile的性质,线程B能看见线程A写state之前的所有操作.
¶tryLock操作
一般使用如下:
1 | try{ |
¶重入锁ReentrantLock
ReentrantLock的特性:
- 支持同一个线程重入
- 支持公平和非公平锁
¶源码原理
uml结构图如下:

ReentrantLock由内部类FiarSync
和NonFairSync
实现了同步机制
1 | public class ReentrantLock implements Lock, java.io.Serializable { |
¶特性
- 非公平和公平的区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//直接修改资源,如果能够成功,则直接获取锁,不进入CLH
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//公平实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //判断是否第二个节点是当前线程,是则返回true,否则返回false
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看出公平锁判断有其他节点等待就继续排队,非公平锁则是无视当前队伍尝试获取资源
- tryLock的实现
通过调用nonfairTryAcquire,参与一次state的修改,但是不进入CLH队列,因此还是需要调用unlock函数
¶ReadWriteLock
读写锁应用于读多,并且不占用过多时间,同时写线程不多的情况;内部采用两个Lock,实际共用同一个Sync,读锁使用share模式,写锁使用exclusive模式,这样就能实现读写互斥,写写互斥,读读共享的效果,其实现为ReentrantReadWriteLock
和StampedLock
,后者并不是直接实现
¶ReentrantReadWriteLock
- 性质
- 响应中断
- 能够进行所谓的写锁降级,即一个线程可以先获取写锁,再获取读锁,不能相反
- 结构
- uml类图

内部AQS锁实现部分和ReentrantLock
类似,只是ReentrantReadWriteLock
内部存在WriteLock
和ReadLock
直接持有Sync
-
实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//读锁,调用sync的share函数
private final ReentrantReadWriteLock.ReadLock readerLock;
//写锁,调用sync的非share函数
private final ReentrantReadWriteLock.WriteLock writerLock;
//共同的sync
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
//创建公平或非公平sync
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//读Lock实现
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync; //持有sync
}
//调用共享
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
//写Lock同理,只是调用exclusive函数
} -
锁结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
abstract static class Sync extends AbstractQueuedSynchronizer {
//读锁(共享)的移位,用来计算读锁资源
static final int SHARED_SHIFT = 16;
//用来向state高位计算读锁占用数的
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//读写最大资源数量,写锁指的是重入次数,读锁指的是共享数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//写锁的掩码,用来计算写锁重入次数
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count. */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //计算读锁共享数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //计算读锁独占数量
//该结构用来记录共享读锁的每个不同线程重入的次数
static final class HoldCounter {
int count; // initially 0
// Use id, not reference, to avoid garbage retention
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
//使用ThreadLocal来记录不同读线程,重写initaValue函数的目的是为了当调用get函数时,
//如果当前线程为第一次,做一个初始化操作
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
//记录读线程的ThreadLocal
private transient ThreadLocalHoldCounter readHolds;
//记录上一次读线程的缓存
private transient HoldCounter cachedHoldCounter;
//记录第一个读线程
private transient Thread firstReader;
//记录第一个读线程的重入次数
private transient int firstReaderHoldCount;
//读锁的公平或非公平实现要点
abstract boolean readerShouldBlock();
//写锁的公平或非公平实现要点
abstract boolean writerShouldBlock();
//----------------独占(写锁)SYNC实现-----------------
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. 若读或写线程存在,即c!=0,并且不是写重入,则失败
* 2. 如果写饱和,则失败
* 3. 否则根据队列策略,即writerShouldBlock,进行cas设置,若通过则获取锁,并设置独占线程
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { //存在其他线程
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) //重入失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) //写饱和
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); //重入成功
return true;
}
if (writerShouldBlock() || //排队策略,NonFair常为false,Fair则和重入锁实现相同
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); //设置独占线程
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//释放写资源
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free) //若写资源即state高16位都为0了,则释放独占线程
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
//------------共享(写锁)SYNC实现----------------
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { //减少firstReader的记录
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null; //若fristReader的重入次数只有一次了,那么就直接清除
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. 如果写线程占用锁,则直接失败
* 2. 否则根据排队策略判断是否排队. 如果不需要,则通过cas设置
* 状态.这一步不进行重入检测.
* 3. 如果由于步骤2中排队策略或者cas失败则进行fullTryAcquireShared
*/
Thread current = Thread.currentThread();
int c = getState();
//[1] 检测是否存在写线程,并且不是当前线程(所谓写锁降级)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//[2] 满足条件排队策略成功,读数量未饱和,cas操作成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//开始记录不同读线程的重入次数,通过firstReader和cachedHoldCounter做优化
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); //注意,这个get(),由于重写了initialValue(),如果不存在值,则会返回一个新值,并且加入ThreadLocal的map中
else if (rh.count == 0)//假设同一个读线程经过加锁,释放,第二次进入到这里就满足该条件(在release函数中有remove操作)
readHolds.set(rh);
rh.count++;//增加重入次数
}
return 1;
}
//[3] 假设由于排队策略,或者cas失败,则需要一次补救措施
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
//采用for循环是为了cas成功
HoldCounter rh = null;
for (;;) {
int c = getState();
//[1] 检测是否存在写线程,并且不是当前线程(所谓写锁降级)
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//[2] 若由于排队策略,导致需要排队(排除重入情况)
} else if (readerShouldBlock()) {
// 用来排除重入锁的情况
if (firstReader == current) { //这里说明此时是fristReader的重入操作,跳过
// assert firstReaderHoldCount > 0;
} else {
//只要满足rh==null&&rh.count==0,就会导致此处仅仅记录一个cachedHoldCounter,不会存到readHolds中
//情况1: firstReader第一次进入,但是由于CLH有写锁排队(公平或非公平都应该是写锁排队导致),结果此处读操作,必定排队等待
//情况2: 第二个读线程,即firstReader还处于持有锁的状态,由于公平锁导致的排队,或者非公平的导致的排队
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
//若rh.count !=0,说明此时进行的是非firstReader的重入操作
if (rh.count == 0)
return -1;
}
}
//[3] 判断读是否饱和
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//[4] 修改资源并记录线程状态
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
}
- state分段
将state分段为高16位和低16位,前者表示共享读锁的资源或重入情况,后者表示独占写锁的重入情况
计算当前读锁数量c >>> SHARED_SHIFT
,计算写锁重入c & EXCLUSIVE_MASK
,增加读锁statecompareAndSetState(c, c + SHARED_UNIT)
,增加写锁compareAndSetState(c, c + acquires)
- 记录每个不同读线程的重入情况
使用ThreadLocal
变量,内部为HoldCounter
,保存线程id和重入次数,同时使用firstReader
和firstReaderHoldCount
把第一次读线程特殊保留,cachedHoldCounter
用来记录上次的读线程
cachedHoldCounter
变量的作用是避免从readHolds
中取数据,是一个缓存
firstReader
属于一种优化,减少向readHolds
中操作,这两个变量都是为了减少记录读线程的操作
这些记录只是为了getReadHoldCount
函数
- state分段
-
排队策略
通过两个函数writerShouldBlock
和readerShouldBlock
为实现公平和非公平锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29//公平锁,即存在排队的节点,则排队
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
//非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
//写锁直接插队
final boolean writerShouldBlock() {
return false; // writers can always barge
}
//读锁会判断是否存在写锁排队,若存在则进入CLH排队,否插队
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
这里读锁的非公平模式只是在判断了当排队的锁是写锁的时候不进行插队,并不能根本上的解决写饥饿的状态