//..............用到了Lock与Condition机制 /** Lock held by take, poll, etc */ privatefinalReentrantLocktakeLock=newReentrantLock(); /** Wait queue for waiting takes */ privatefinalConditionnotEmpty= takeLock.newCondition(); /** Lock held by put, offer, etc */ privatefinalReentrantLockputLock=newReentrantLock(); /** Wait queue for waiting puts */ privatefinalConditionnotFull= putLock.newCondition(); //...........put方法 /** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { if (e == null) thrownewNullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. intc= -1; Node<E> node = newNode<E>(e); finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } //...........take方法 public E take()throws InterruptedException { E x; intc= -1; finalAtomicIntegercount=this.count; finalReentrantLocktakeLock=this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }