avatar

多线程Ⅴ----并发包锁和同步通信

JUC结构

JUC可以分为AQS锁实现,线程池相关,同步通信工具,原子类,并发集合,可以参考,主要分析AQS锁实现,线程池,以及队列和并发集合实现.

这里特指JUC中的Lock类,定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

Lock类具有以下性质:

  • 提供了更多操作,支持更加延展性的结构,通过可以关联多个Condition对象
  • 一般来说是独占方式,但也可以支持共享,如ReadWriteLock
  • sync通过每个对象的监视器实现,获取多个锁和释放多个锁顺序相反
  • Lock实现了和Sync相同的语义,完成了可见性和原子性,前者是由于volatile本身的性质完成的
  • Lock可以响应中断操作

Lock的可见性

在Lock的注释中描述了,Lock具有和sync相同的内存同步语义,以ReentrantLock为例,简化内部源码,如下

1
2
3
4
5
6
7
8
9
10
11
private volatile int state;

void lock() {
read state
if (can get lock)
write state
}

void unlock() {
write state
}

假设有一个变量a,考虑它的可见性

1
2
3
4
5
6
7
8
9
10
int a = 0;
Lock lock = new ReentrantLock();
void test(){
try{
lock.lock();
a++;
}finally{
lock.unlock();
}
}

根据happend-before原则: volatile变量规则:可见性规则,该种变量的读取发生在写之后,假设线程A,B同时获取锁,当A线程持有锁改变了a,并且释放锁,那么它的顺序就是 写a-->写state,此时线程B被唤醒,在AQS中会再次执行tryAcqurrie(),从而经历读state,读a,由于volatile的性质,线程B能看见线程A写state之前的所有操作.

tryLock操作

一般使用如下:

1
2
3
4
5
6
7
8
9
try{
if (lock.tryLock()) {
//执行获取锁的操作
} else {
//获得锁失败
} finally{
lock.unlock();//这一步必须执行
}
}

重入锁ReentrantLock

ReentrantLock的特性:

  • 支持同一个线程重入
  • 支持公平和非公平锁

源码原理

uml结构图如下:

ReentrantLock由内部类FiarSyncNonFairSync实现了同步机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;//实际上的锁
abstract static class Sync extends AbstractQueuedSynchronizer {
//非公平锁,实际上就是直接在AQS中排队
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //重入性的实现
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
//非公平锁
static final class NonfairSync extends Sync {

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//公平锁
static final class FairSync extends Sync {
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//模式选择
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//执行公平或非公平
public void lock() {
sync.acquire(1);
}
//尝试的实现
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
}

特性

  • 非公平和公平的区别
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //直接修改资源,如果能够成功,则直接获取锁,不进入CLH
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }
    //公平实现

    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (!hasQueuedPredecessors() && //判断是否第二个节点是当前线程,是则返回true,否则返回false
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

可以看出公平锁判断有其他节点等待就继续排队,非公平锁则是无视当前队伍尝试获取资源

  • tryLock的实现
    通过调用nonfairTryAcquire,参与一次state的修改,但是不进入CLH队列,因此还是需要调用unlock函数

ReadWriteLock

读写锁应用于读多,并且不占用过多时间,同时写线程不多的情况;内部采用两个Lock,实际共用同一个Sync,读锁使用share模式,写锁使用exclusive模式,这样就能实现读写互斥,写写互斥,读读共享的效果,其实现为ReentrantReadWriteLockStampedLock,后者并不是直接实现

ReentrantReadWriteLock

  • 性质
    • 响应中断
    • 能够进行所谓的写锁降级,即一个线程可以先获取写锁,再获取读锁,不能相反
  • 结构
  • uml类图

内部AQS锁实现部分和ReentrantLock类似,只是ReentrantReadWriteLock内部存在WriteLockReadLock直接持有Sync

  • 实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {
    //读锁,调用sync的share函数
    private final ReentrantReadWriteLock.ReadLock readerLock;
    //写锁,调用sync的非share函数
    private final ReentrantReadWriteLock.WriteLock writerLock;
    //共同的sync
    final Sync sync;
    public ReentrantReadWriteLock(boolean fair) {
    //创建公平或非公平sync
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
    }
    //读Lock实现
    public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync; //持有sync
    }
    //调用共享
    public void lock() {
    sync.acquireShared(1);
    }

    public void unlock() {
    sync.releaseShared(1);
    }
    }
    //写Lock同理,只是调用exclusive函数
    }
  • 锁结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
     
    abstract static class Sync extends AbstractQueuedSynchronizer {
    //读锁(共享)的移位,用来计算读锁资源
    static final int SHARED_SHIFT = 16;
    //用来向state高位计算读锁占用数的
    static final int SHARED_UNIT = (1 << SHARED_SHIFT);
    //读写最大资源数量,写锁指的是重入次数,读锁指的是共享数量
    static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
    //写锁的掩码,用来计算写锁重入次数
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    /** Returns the number of shared holds represented in count. */
    static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
    /** Returns the number of exclusive holds represented in count. */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //计算读锁共享数量
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //计算读锁独占数量

    //该结构用来记录共享读锁的每个不同线程重入的次数
    static final class HoldCounter {
    int count; // initially 0
    // Use id, not reference, to avoid garbage retention
    final long tid = LockSupport.getThreadId(Thread.currentThread());
    }
    //使用ThreadLocal来记录不同读线程,重写initaValue函数的目的是为了当调用get函数时,
    //如果当前线程为第一次,做一个初始化操作
    static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
    return new HoldCounter();
    }
    }
    //记录读线程的ThreadLocal
    private transient ThreadLocalHoldCounter readHolds;
    //记录上一次读线程的缓存
    private transient HoldCounter cachedHoldCounter;
    //记录第一个读线程
    private transient Thread firstReader;
    //记录第一个读线程的重入次数
    private transient int firstReaderHoldCount;
    //读锁的公平或非公平实现要点
    abstract boolean readerShouldBlock();
    //写锁的公平或非公平实现要点
    abstract boolean writerShouldBlock();
    //----------------独占(写锁)SYNC实现-----------------
    protected final boolean tryAcquire(int acquires) {
    /*
    * Walkthrough:
    * 1. 若读或写线程存在,即c!=0,并且不是写重入,则失败
    * 2. 如果写饱和,则失败
    * 3. 否则根据队列策略,即writerShouldBlock,进行cas设置,若通过则获取锁,并设置独占线程
    */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { //存在其他线程
    // (Note: if c != 0 and w == 0 then shared count != 0)
    if (w == 0 || current != getExclusiveOwnerThread()) //重入失败
    return false;
    if (w + exclusiveCount(acquires) > MAX_COUNT) //写饱和
    throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    setState(c + acquires); //重入成功
    return true;
    }
    if (writerShouldBlock() || //排队策略,NonFair常为false,Fair则和重入锁实现相同
    !compareAndSetState(c, c + acquires))
    return false;
    setExclusiveOwnerThread(current); //设置独占线程
    return true;
    }

    protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    //释放写资源
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free) //若写资源即state高16位都为0了,则释放独占线程
    setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
    }
    //------------共享(写锁)SYNC实现----------------
    protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) { //减少firstReader的记录
    // assert firstReaderHoldCount > 0;
    if (firstReaderHoldCount == 1)
    firstReader = null; //若fristReader的重入次数只有一次了,那么就直接清除
    else
    firstReaderHoldCount--;
    } else {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null ||
    rh.tid != LockSupport.getThreadId(current))
    rh = readHolds.get();
    int count = rh.count;
    if (count <= 1) {
    readHolds.remove();
    if (count <= 0)
    throw unmatchedUnlockException();
    }
    --rh.count;
    }
    for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
    // Releasing the read lock has no effect on readers,
    // but it may allow waiting writers to proceed if
    // both read and write locks are now free.
    return nextc == 0;
    }
    }


    protected final int tryAcquireShared(int unused) {
    /*
    * Walkthrough:
    * 1. 如果写线程占用锁,则直接失败
    * 2. 否则根据排队策略判断是否排队. 如果不需要,则通过cas设置
    * 状态.这一步不进行重入检测.
    * 3. 如果由于步骤2中排队策略或者cas失败则进行fullTryAcquireShared
    */
    Thread current = Thread.currentThread();
    int c = getState();
    //[1] 检测是否存在写线程,并且不是当前线程(所谓写锁降级)
    if (exclusiveCount(c) != 0 &&
    getExclusiveOwnerThread() != current)
    return -1;
    int r = sharedCount(c);
    //[2] 满足条件排队策略成功,读数量未饱和,cas操作成功
    if (!readerShouldBlock() &&
    r < MAX_COUNT &&
    compareAndSetState(c, c + SHARED_UNIT)) {
    //开始记录不同读线程的重入次数,通过firstReader和cachedHoldCounter做优化
    if (r == 0) {
    firstReader = current;
    firstReaderHoldCount = 1;
    } else if (firstReader == current) {
    firstReaderHoldCount++;
    } else {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null ||
    rh.tid != LockSupport.getThreadId(current))
    cachedHoldCounter = rh = readHolds.get(); //注意,这个get(),由于重写了initialValue(),如果不存在值,则会返回一个新值,并且加入ThreadLocal的map中
    else if (rh.count == 0)//假设同一个读线程经过加锁,释放,第二次进入到这里就满足该条件(在release函数中有remove操作)
    readHolds.set(rh);
    rh.count++;//增加重入次数
    }
    return 1;
    }
    //[3] 假设由于排队策略,或者cas失败,则需要一次补救措施
    return fullTryAcquireShared(current);
    }
    final int fullTryAcquireShared(Thread current) {
    //采用for循环是为了cas成功
    HoldCounter rh = null;
    for (;;) {
    int c = getState();
    //[1] 检测是否存在写线程,并且不是当前线程(所谓写锁降级)
    if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
    return -1;
    // else we hold the exclusive lock; blocking here
    // would cause deadlock.
    //[2] 若由于排队策略,导致需要排队(排除重入情况)
    } else if (readerShouldBlock()) {
    // 用来排除重入锁的情况
    if (firstReader == current) { //这里说明此时是fristReader的重入操作,跳过
    // assert firstReaderHoldCount > 0;
    } else {
    //只要满足rh==null&&rh.count==0,就会导致此处仅仅记录一个cachedHoldCounter,不会存到readHolds中
    //情况1: firstReader第一次进入,但是由于CLH有写锁排队(公平或非公平都应该是写锁排队导致),结果此处读操作,必定排队等待
    //情况2: 第二个读线程,即firstReader还处于持有锁的状态,由于公平锁导致的排队,或者非公平的导致的排队
    if (rh == null) {
    rh = cachedHoldCounter;
    if (rh == null ||
    rh.tid != LockSupport.getThreadId(current)) {
    rh = readHolds.get();
    if (rh.count == 0)
    readHolds.remove();
    }
    }
    //若rh.count !=0,说明此时进行的是非firstReader的重入操作
    if (rh.count == 0)
    return -1;
    }
    }
    //[3] 判断读是否饱和
    if (sharedCount(c) == MAX_COUNT)
    throw new Error("Maximum lock count exceeded");
    //[4] 修改资源并记录线程状态
    if (compareAndSetState(c, c + SHARED_UNIT)) {
    if (sharedCount(c) == 0) {
    firstReader = current;
    firstReaderHoldCount = 1;
    } else if (firstReader == current) {
    firstReaderHoldCount++;
    } else {
    if (rh == null)
    rh = cachedHoldCounter;
    if (rh == null ||
    rh.tid != LockSupport.getThreadId(current))
    rh = readHolds.get();
    else if (rh.count == 0)
    readHolds.set(rh);
    rh.count++;
    cachedHoldCounter = rh; // cache for release
    }
    return 1;
    }
    }
    }
    }

    • state分段
      将state分段为高16位和低16位,前者表示共享读锁的资源或重入情况,后者表示独占写锁的重入情况
      计算当前读锁数量c >>> SHARED_SHIFT,计算写锁重入c & EXCLUSIVE_MASK,增加读锁statecompareAndSetState(c, c + SHARED_UNIT),增加写锁compareAndSetState(c, c + acquires)
    • 记录每个不同读线程的重入情况
      使用ThreadLocal变量,内部为HoldCounter,保存线程id和重入次数,同时使用firstReaderfirstReaderHoldCount把第一次读线程特殊保留,cachedHoldCounter用来记录上次的读线程
      cachedHoldCounter变量的作用是避免从readHolds中取数据,是一个缓存
      firstReader属于一种优化,减少向readHolds中操作,这两个变量都是为了减少记录读线程的操作
      这些记录只是为了getReadHoldCount函数
  • 排队策略
    通过两个函数writerShouldBlockreaderShouldBlock为实现公平和非公平锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    //公平锁,即存在排队的节点,则排队
    static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
    }
    }
    //非公平锁
    static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    //写锁直接插队
    final boolean writerShouldBlock() {
    return false; // writers can always barge
    }
    //读锁会判断是否存在写锁排队,若存在则进入CLH排队,否插队
    final boolean readerShouldBlock() {
    /* As a heuristic to avoid indefinite writer starvation,
    * block if the thread that momentarily appears to be head
    * of queue, if one exists, is a waiting writer. This is
    * only a probabilistic effect since a new reader will not
    * block if there is a waiting writer behind other enabled
    * readers that have not yet drained from the queue.
    */
    return apparentlyFirstQueuedIsExclusive();
    }
    }

这里读锁的非公平模式只是在判断了当排队的锁是写锁的时候不进行插队,并不能根本上的解决写饥饿的状态


参考

java锁是如何保证数据可见性的
reentrantlock如何保证可见性
锁种类美团博客

文章作者: fancylight
文章链接: https://www.fancylight.top/2021/03/15/%E5%A4%9A%E7%BA%BF%E7%A8%8B%E2%85%A4-%E5%B9%B6%E5%8F%91%E5%8C%85%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E9%80%9A%E4%BF%A1/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 博客
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论