SynchronousQueue源码解析

SynchronousQueue源码解析

SynchronousQueue是一个比较特别的阻塞队列,它是一个没有容量的队列。如果有线程往该队列里面添加元素,但是又没有消费线程过来消费,就不能在添加元素,反之亦然。

SynchronousQueue也被应用到CachedThreadPool中,作为其默认的阻塞队列。

类图

SynchronousQueue的类图如下所示:

SynchronousQueue实现了BlockingQueue接口和继承了AbstractQueue类。

1
2
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>

BlockingQueue接口只是定义了一些阻塞队列的公共方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface BlockingQueue<E> extends Queue<E> {
// 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);

// 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);

// 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

// 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

// 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;

// 获取并移除此队列的头元素,可以在指定的等待时间前等待可用的元素,timeout表明放弃之前要等待的时间长度,用 unit 的时间单位表示,如果在元素可用前超过了指定的等待时间,则返回null,当等待时可以被中断
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

// 获取队列中剩余的空间。
int remainingCapacity();

// 从队列中移除指定的值。
boolean remove(Object o);

// 判断队列中是否拥有该值。
public boolean contains(Object o);

// 将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);

// 将队列中值,全部移除,并发设置到给定的集合中。最多设置maxElements个元素
int drainTo(Collection<? super E> c, int maxElements);
}

AbstractQueue也实现了Queue接口,还提供了某些方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 从这可以发现,Add方法,其实就是调用的offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

// 调用poll方法,如果为null抛出NoSuchElementException异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 调用peek方法,如果为null,抛出NoSuchElementException异常
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 按个从队列中删除node
public void clear() {
while (poll() != null)
;
}

public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** The number of CPUs, for spin control */
// cpu核数,做自旋控制用
static final int NCPUS = Runtime.getRuntime().availableProcessors();

/**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
// 带超时的情况,自旋的最大次数,如果CPU核数为1,不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
// 不带超时的情况,自旋的最大次数为带超时的16倍
static final int maxUntimedSpins = maxTimedSpins * 16;

/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
// 针对有超时的情况,自旋了多少次后,如果剩余时间大于1000纳秒就使用带时间的LockSupport.parkNanos()这个方法
static final long spinForTimeoutThreshold = 1000L;

主要方法

无参构造函数

1
2
3
4
5
6
/**
* Creates a {@code SynchronousQueue} with nonfair access policy.
*/
public SynchronousQueue() {
this(false);
}

创建一个不公平的队列

有参构造函数-指定锁是否公平

根据传入的fair参数,判断创建的是公平锁-TransferQueue还是不公平锁-TransferStack

1
2
3
4
5
6
7
8
9
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

TransferQueue

TransferQueue继承了了Transfer抽象类,并实现了其transfer方法。它是一个FIFO的双端队列,内部以QNode对象作为节点对象。SynchronousQueue通过TransferQueue实现了公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
static final class TransferQueue<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/

/** Node class for TransferQueue. */
// TransferQueue的节点对象,每一个node都表示一次take(pool)或者put(offer)
static final class QNode {
// 下一个节点
volatile QNode next; // next node in queue
// 节点的值,通过cas设置
volatile Object item; // CAS'ed to or from null
// take或者put的线程
volatile Thread waiter; // to control park/unpark
// 入队或出队,true表示入队
final boolean isData;

QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}

// cas设置下一个节点
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
// 当当前对象(this)的值是cmp时,设置this的next值为val
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// cas设置当前节点的值
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

/**
* Tries to cancel by CAS'ing ref to this as item.
*/
// 将item的值赋值为自己,即取消item的赋值
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// item如果等于自己的话,表示已经取消了复制
boolean isCancelled() {
return item == this;
}

/**
* Returns true if this node is known to be off the queue
* because its next pointer has been forgotten due to
* an advanceHead operation.
*/
// 如果当前节点已经从队列中删除,返回true
boolean isOffList() {
return next == this;
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

/** Head of queue */
// 首节点
transient volatile QNode head;
/** Tail of queue */
// 尾节点
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
//
transient volatile QNode cleanMe;

// 构造函数,构造一个dummy node,head 和 tail都指向它。
// 整个队列的生命周期里面都会存在这个dummy node
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
// 将h指向nh,并将h.next指向自己
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}

/**
* Tries to cas nt as new tail.
*/
// cas设置tail
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

/**
* Tries to CAS cleanMe slot.
*/
// cas设置cleanMe
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

/**
* Puts or takes an item.
*/
// 插入或者取出数据
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/

QNode s = null; // constructed/reused as needed
// 如果e不为null,说明是插入数据
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
// 未初始化
continue; // spin
// 如果队列为空,或者队尾的操作和当前操作一致,都是插入或者都是取出
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read // t是不是指向队尾
continue;
if (tn != null) { // lagging tail // t是不是尾部节点,不是的话,cas更新
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait // 如果是带超时的操作,并且没有相反的操作,直接返回null
return null;
if (s == null)
s = new QNode(e, isData);
// tail的next指向新节点
if (!t.casNext(null, s)) // failed to link in
continue;
// tail指向新节点
advanceTail(t, s); // swing tail and wait
// 如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交

Object x = awaitFulfill(s, e, timed, nanos);
// 当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
if (x == s) { // wait was cancelled
// 将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
clean(t, s);
return null;
}
// 如果s 还没有被溢出队列
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;

} else { // complementary-mode
// 有反向操作,取出的时候有插入,插入的时候有取出
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read

Object x = m.item;
// 这里先判断m是否是有效的操作
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 更新头部节点
advanceHead(h, m); // successfully fulfilled
// 唤醒m节点被挂起的线程
LockSupport.unpark(m.waiter);
// 返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
return (x != null) ? (E)x : e;
}
}
}

/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
// 获取超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 获取当前线程
Thread w = Thread.currentThread();
// 线程被挂起或是进入超时等待之前阻止自旋的次数
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
// x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
if (x != e)
// 返回原值
return x;
// 带超时参数
if (timed) {
// 是否已经超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
// 挂起当前线程
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
LockSupport.parkNanos(this, nanos);
}
}

Transfer抽象类

只有一个抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
0%