//---------该函数中AQS中的函数,用来判断任意node是否是同步队列中的元素-------------------- finalbooleanisOnSyncQueue(Node node){ //condition好理解, node.prev==null的判断是因为,在同步队列中,任意状态下 //都不会尝试修改prev=null这个操作 if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; //若存在后继说明一定是同步队列中的节点 if (node.next != null) // If has successor, it must be on queue returntrue; //遍历判断 return findNodeFromTail(node); }
publicfinalvoidsignal(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
privatevoiddoSignal(Node first){ do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); //尝试出队队首,只要成功,便退出 }
privatevoiddoSignalAll(Node first){ lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); //不断出队,一直等到等待队列为空 }
finalbooleantransferForSignal(Node node){ //如果CAS失败,说明该等待节点可能成为了取消节点,那么跳过该节点 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) returnfalse; Node p = enq(node); //将等待节点加入同步队列队尾,并返回前驱 int ws = p.waitStatus; //如果等待前驱为取消或者置为-1失败,则主动唤醒node if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }
publicclassArrayBlockingQueue<E> extendsAbstractQueue<E> implementsBlockingQueue<E>, java.io.Serializable{ /** Main lock guarding all access */ final ReentrantLock lock;
/** Condition for waiting takes */ privatefinal Condition notEmpty;
/** Condition for waiting puts */ privatefinal Condition notFull;
public E take()throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }