Lock和Condition的应用

Condition对象

Java提供了Condition对象来实现等待/通知。

Object对象提供了wait、waitAll、notify、notifyAll的方法用来实现线程的同步、等待和唤醒。Condition类提供了比wait/notify更丰富的功能,Condition对象由lock对象所创建的,同时一个Lock可以创建多个Condition对象,即创建多个对象监听器,这样就可以指定唤醒具体线程,而notify是随机唤醒线程。

Condition接口包含的方法

先看下Condition的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Condition {
void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();
}

Condition主要提供了以下方法:

  • await:使当前线程在接到信号或者中断之前一直阻塞等待;
  • awaitUninterruptibly:使当前线程在收到信号之前一直等待。此处对中断不做响应
  • awaitNanos:使当前线程在接到信号或者中断或者到达指定的纳秒时间之前一直阻塞等待
  • await(long time, TimeUnit unit):使当前线程在接到信号或者中断或者到达指定的时间之前一直阻塞等待。此处可指定任意时间以及单位
  • awaitUntil(Date deadline):使当前线程在接到信号或者中断或者到达截止日期之前一直阻塞等待
  • signal:唤醒一个等待线程
  • signalAll:唤醒所有的等待线程

AQS中的ConditionObject是实现Condition接口的实现。ConditionObject的等待队列是一个FIFO队列,队列的每个节点都是等待在Condition对象上的线程的引用,在调用Condition的await()方法之后,线程释放锁,构造成相应的节点进入等待队列等待。其中节点的定义复用AQS的Node定义。

ConditionObject

先看看这个对象的字段定义

1
2
3
4
5
6
7
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}

ConditionObject是AQS的内部类,这样它就能访问到AQS的FIFO队列了。COnditionObject内部维护的是一个单向队列,它的首节点就是一个第一个被阻塞的线程节点。

下面我们挨个看下它的一些方法是怎么实现的。

await方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Implements interruptible condition wait.
* 实现了可中断的条件等待
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 如果当前线程已经被中断了,则直接抛出InterruptedException
throw new InterruptedException();
// 构造线程节点,添加到队尾
Node node = addConditionWaiter();
// 添加进队列之后,释放锁,
int savedState = fullyRelease(node);
int interruptMode = 0;
// 死循环判断当前被唤醒的节点是否已经转移到AQS同步队列中,成功就退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果发生中断则确保节点加入同步队列并跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被通知或者被中断的线程,继续获取竞争锁
// 获取成功或者被取消则再设置中断模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 删除取消的后继等待节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根据中断模式抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 将当前的线程作为waiter添加到等待队列
* 返回当前节点的引用
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 如果t的状态时cancelled,从队列中清除
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 首节点和尾节点都指向当前节点
firstWaiter = node;
else
// 链接到队尾
t.nextWaiter = node;
lastWaiter = node;
return node;
}

unlinkCancelledWaiters方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 将链表中的cancelled节点从链表中移除。其中变量trail保存的是当前节点的前置节点的引用
* t保存的是当前节点的下一个节点的引用。
* 分两种情况,如果头结点是cancelled状态
* ------------------------- -------------------------
* |cancelled | nextWaiter | ---> | condition | nextWaiter | ---> null
* ------------------------- -------------------------
* 移除后
* ------------------------- -------------------------
* |cancelled | nextWaiter | ---> null | condition | nextWaiter | ---> null
* ------------------------- -------------------------
* ^
* |
* firstWaiter
* 如果头结点不是cancelled状态
* ------------------------- -------------------------
* |condition | nextWaiter | ---> | cancelled | nextWaiter | ---> null
* ------------------------- -------------------------
* 移除后
* * ------------------------- -------------------------
* |cancelled | nextWaiter | ---> null | condition | nextWaiter | ---> null
* ------------------------- -------------------------
* ^
* |
* firstWaiter
*
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

fullyRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 释放锁并返回新的state,如果释放失败,则把当前的节点状态改为cancelled
* 注意,设置cancelled状态是在finally里面的。只要释放失败,就改节点状态
*
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

isOnSyncQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 转移到AQS同步队列的节点状态为都会设置为初始状态(值为0)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 当节点是AQS同步队列的中间节点时(在同步队列中含有next节点)则返回true
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

整体流程

  1. 将当前线程封装为Node节点并加入到条件等待队列中;

  2. 释放锁,如果释放锁成功,则调用unparkSuccessor方法唤醒该节点在AQS的FIFO队列中的后继节点;

  3. while死循环判断是否已经在AQS的FIFO队列中了(其他线程调用signal或者signalAll方法时,会触发这个节点被转移到FIFO队列的动作);

    • 如果没有在FIFO队列中,则park当前线程,等待唤醒;
    • 同时通过方法checkInterruptWhileWaiting判断线程是否在等待的过程中发生了中断,赋值interruptMode中断便签
  4. 关于中断模式: 1) 当在被通知前被中断则将中断模式设置为THROW_IE; 2) 当在被通知后则将中断模式设置为REINTERRUPT(因为acquireQueued不会响应中断)。

  5. 删除取消的后继等待节点。

  6. 根据中断模式抛出异常。

awaitUninterruptibly方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 如果在死循环的过程中,线程被中断了,忽略
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

awaitNanos方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 和await方法类似,增加了超时时间
* 超时了之后,直接将当前节点转入AQS的FIFO队列
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

transferAfterCancelledWait方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

awaitUntil方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 在截止时间之前如果没有被唤醒的话则强制将当前的Node加入到AQS的FIFO队列
*
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

await(long time, TimeUnit unit)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 和await方法类似,超时就移入AQS的FIFO队列
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

signal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** 将等待时间最长的节点,也就是首节点移入AQS的FIFO队列去竞争锁
* 当前线程需要获取锁
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 是否独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** 将Condition队列中第一个非cancalled状态的节点从队列中移除,并移入AQS的FIFO队列中
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 将Condition队列中的节点移入AQS的FIFO队列。成功返回true
* 首先enq将该node添加到CLH队列中
* 其次若CLH队列原先尾节点为CANCELLED或者对原先尾节点CAS设置成SIGNAL失败
* 则唤醒node节点;否则该节点在CLH队列总前驱节点已经是signal状态了,唤醒工作交给前驱节点
*
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signalAll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 将CONDITION队列中所有node出队,逐个添加到FIFO队列末尾
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

doSignalAll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

总结

  1. 每一个创建的ConditionObject都维持这各自的一个单向的等待队列,但是同一个Lock的所有ConditionObject都共享一个AQS的FIFO同步队列;
  2. 当调用await方法时释放锁并进入阻塞状态,调用signal方法将条件等待队列中的首节点线程移动到AQS同步队列中并将其前继节点设置为SIGNAL或者直接唤醒线程使得被通知的线程能去获取锁;
  3. 调用await方法释放锁并将线程添加到条件等待队列中并没有采用死循环CAS设置(参考AQS.enq方法),因为Condition对象只能用于独占模式,而且在调用await之前会显示的获取独占锁,否则会抛出非法监视器状态异常。
  4. 调用signal方法将转移等待节点,也不需要CAS来保证,因为signal会确保调用者caller是获取独占锁的线程(通过isHeldExclusively方法来判断,如果为false会抛出非法监视器状态的异常)。

参考资料

Java并发编程-Lock和Condition

AQS的ConditionObject源码详解

jdk1.8 J.U.C并发源码阅读——AQS之conditionObject内部类分析

0%