privatevoidunparkSuccessor(Node node){ int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { //这里next==null就是上述竞争导致的 s = null; for (Node p = tail; p != node && p != null; p = p.prev) // if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); }
当开始反向遍历的时候,for (Node p = tail; p != node && p != null; p = p.prev),这里的p如果还是oldTail,说明线程A的插入过程未结束,那么唤醒它没有意义,unparkSuccessor的s==null,直接退出;若p=线程A,则说明线程A竞争过程的2步骤已经执行结束,直接唤醒线程A,由于park的性质,不用等待线程A的步骤3执行结束
publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //----------------插入队列尾操作-------------------------------- private Node addWaiter(Node mode){ Node node = new Node(mode); //指定节点的模式,此时为独享模式 //这里采用自选加原子操作保证会将新节点放置到队尾 for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue();//当队首为空时,初始化一个空节点 } } } privatefinalvoidinitializeSyncQueue(){ Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) //此时队首Node是一个虚拟节点,没有实际含义,但必须存在 tail = h; } //----------------队列中的元素获取行为------------------ //首先能够进入这个函数,说明当前节点之前必定存在前驱节点,可能是初始化的虚拟节点, //也可能是其他排队线程节点 finalbooleanacquireQueued(final Node node, int arg){ boolean interrupted = false; try { //通过循环和内部的park将当前线程停留在此处 for (;;) { //当且仅当前驱为head,即当且节点为第二个元素,并且能够获取到资源 //将当前节点置为head,并释放 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } //若不满足上述条件则判断前驱的状态,是否应该直接park当前节点 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); //此处只是调用了Thread.currentThread().interrupt(),保留了线程中断信息,AQS并不做任何处理,用户可进行判断并处理 throw t; } } privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若前驱为SIGNAL,说明前驱正常,则直接park当前节点即可 /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; //下两种状态都没有直接进入park,而是再次尝试获取资源 if (ws > 0) { //若前驱为CANCELL状态,则将当前节点的pre和之前一个正常节点相连 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;//这里断开了正常节点的next,相当于会将那些cancelled节点GC } else { //此时前驱一定处于0或者-3,此时将前驱改成-1 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } returnfalse; } //park,注意这里返回并清除了中断信号 privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //此处从后向前找的原因就是,AQS里next链除了出队的时候基本不变,也就是说下一个节点可能是取消的节点, //因此通过从后向前找可以找后继中第一个有效的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); }
privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// Skip cancelled predecessors //修改目前要取消的节点pre为前驱中正常节点,也就是说取消的节点不会通过pre相连接 //这样是为了在unpark过程中,从后向前找一定能够找到第一个有效的待激活节点,和should中不同的是 //没有改变任意节点的next状态 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary, although with // a possibility that a cancelled node may transiently remain // reachable. Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { unparkSuccessor(node); }