ConcurrentLinkedQueue源码解析

ConcurrentLinkedQueue源码解析

ConcurrentLinkedQueue是一个基于链表节点Node的无界线程安全队列。它是一个先进先出队列,头结点是在这个队列中最久的节点,尾节点是在这个队列中最新的节点。每次插入新节点都从尾部插入,获取节点就从头部插入。

ConcurrentLinkedQueue使用的是非阻塞的CAS算法,内部是一个链表。

类图

ConcurrentLinkedQueue的类图如下所示:

ConcurrentLinkedQueue实现了Queue接口和继承了AbstractQueue类。Itr和Node则是它的内部类。

1
2
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable

Queue 接口只是定义了一些队列的公共方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Queue<E> extends Collection<E> {
// 添加元素
boolean add(E e);

// 添加元素
boolean offer(E e);

// 删除元素,如果元素不存在,则抛出NoSuchElementException异常
E remove();

// 返回并删除第一个元素,如果队列为空,则返回null
E poll();

// 返回第一个元素,如果不存在,抛出NoSuchElementException异常
E element();

// 返回第一个元素,但不删除,如果队列为空,则返回null
E peek();
}

AbstractQueue也实现了Queue接口,还提供了某些方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 从这可以发现,Add方法,其实就是调用的offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

// 调用poll方法,如果为null抛出NoSuchElementException异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 调用peek方法,如果为null,抛出NoSuchElementException异常
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 按个从队列中删除node
public void clear() {
while (poll() != null)
;
}

public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* Invariants:
* - all live nodes are reachable from head via succ()
* 所有存活的节点都可以通过succ方法,从head遍历到
* - head != null
* head节点不能为null
* - (tmp = head).next != tmp || tmp != head
* head节点不能引用自身
* Non-invariants:
* - head.item may or may not be null.
* head节点的值,可以是null,也可以不为null
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
*/
private transient volatile Node<E> head;

/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* 最后一个节点一定可以通过tail调用succ方法访问到
* - tail != null
* tail节点不为null
* Non-invariants:
* - tail.item may or may not be null.
* tail节点的值可以,也可以不为null
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
* - tail.next may or may not be self-pointing to tail.
* tail可以,指向或者不指向自己
*/
private transient volatile Node<E> tail;

主要方法

无参构造函数

1
2
3
4
5
6
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

生成一个值为null的节点,head,tail都指向该节点

有参构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Creates a {@code ConcurrentLinkedQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
// 当前值不能为null
checkNotNull(e);
// 生成新的节点
Node<E> newNode = new Node<E>(e);
if (h == null)
// 第一次,初始化
h = t = newNode;
else {
// 将t的后继设置为newNode
t.lazySetNext(newNode);
// t指向newNode
t = newNode;
}
}
// 如果集合为空
if (h == null)
// 生成值为null的新节点
h = t = new Node<E>(null);
// head指向h
head = h;
// tail指向t
tail = t;
}

add方法

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
// 直接调用offer方法
return offer(e);
}

offer方法

将元素添加到队列的队尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 判断元素是否为null
checkNotNull(e);
// 新生成Node
final Node<E> newNode = new Node<E>(e);
// 从尾节点插入
for (Node<E> t = tail, p = t;;) {
// p = t = tail
// q = tail.next
Node<E> q = p.next;
// 如果null == q,说明当前节点是尾节点
if (q == null) {
// p is last node
// cas设置p的后继节点
if (p.casNext(null, newNode)) {
// cas设置成功
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live"
if (p != t) // 每每经过一次 p = q 操作(向后遍历节点), 则 p != t 成立, 这个也说明 tail 滞后于 head 的体现
// 设置t为newNode
casTail(t, newNode); // Failure is OK.
return true;
}
// CAS失败,说明其他线程已经把当前的p.next修改了
// Lost CAS race to another thread; re-read next
}
else if (p == q) //(p == q) 成立, 则说明p是pool()时调用 "updateHead" 导致的(删除头节点); 此时说明 tail 指针已经 fallen off queue, 所以进行 jump 操作, 若在t没变化, 则 jump 到 head, 若 t 已经改变(jump操作在另外的线程中执行), 则jump到 head 节点, 直到找到 node.next = null 的节点
/** 1. 大前提 p 是已经被删除的节点
* 2. 判断 tail 是否已经改变
* 1) tail 已经变化, 则说明 tail 已经重新定位
* 2) tail 未变化, 而 tail 指向的节点是要删除的节点, 所以让 p 指向 head
* 判断尾节点是否有变化
* 1. 尾节点变化, 则用新的尾节点
* 2. 尾节点没变化, 将 tail 指向head
*/
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 重新找到尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

一步一步看:

  1. 初始化的时候,新增一个item为null,next为null的node,然后把head和tail指向这个node
  2. 接着往queue里面新增一个值为a的节点
  3. 在上述offer方法的第14行和15行,分别生成一个节点p,和q,都指向当前的null Node,生成一个q节点,指向p节点的next,因此为null
  4. 此时,null == q,那么cas设置p的next为新创建的职位a的node
  5. 此时 p == t,因此不会进入27行的if判断
  6. 然后,再向queue中新增一个b节点。
  7. 这个时候的,p和t,还是指向tail,tail还是指向当前的null节点,但是,新生成的q节点,就指向了a节点
  8. 此时null != q,p != q,因此会进入49行的else分支
  9. 49行的else分支做的事情就是寻找新的为节点,会将p指向a节点
  10. 然后又进入了添加节点a的逻辑,但是,此时,p指向a,tail指向null节点,27行的条件p != t成立,重新把tail指向新的b节点。这就是tail的滞后性
  11. 重新设置tail的时候,允许失败,在下一次添加的时候,还是会重试的。

什么时候p == q条件会成立呢? 这需要先看看poll方法的逻辑。

poll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E poll() {
// goto 标记
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 保存原头结点的值
E item = p.item;
// 如果原值不为null,cas设置该值为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// cas成功之后
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 如果当前节点被自引用了,则重新寻找新的队列头结点
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
  • 队列一开始为空时的状态如图所示:

由于head节点指向的是item为null的哨兵节点,所以会执行到18行的if语句。假设这个过程中没有线程调用offer方法,此时q等于null,这个时候队里的状态如下图所示:

所以会执行updateHead方法,由于h等于p所以没有设置头结点,poll方法直接返回null

1
2
3
4
5
6
7
8
/**
* Tries to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  • 如果,执行到18行的if语句时,已经有其他线程调用了offer方法并成功添加一个元素到队列,这时候q指向的是新增元素的节点:

因此,18行if语句不满足条件,继而执行23行的if语句,但p不等于q,所以再执行26行,将p指向了q指向的节点

接着,又进入循环,此时p指向的元素值不为null,尝试将p的item值设置为null,如果此时没有其他线程进行poll操作,则cas成功之后,会执行13行代码,因为此时p != h,所以,设置头结点为p,并设置h的next节点为h自己,poll然后返回被从队列移除的节点值item

这个状态就是offer方法里面p == q时候的状态

  • 假设现在一个线程调用了poll操作,那么执行9行代码的时候,队列状态如下所示:

这时候执行18行代码,返回null

  • 现在poll的代码还有分支23行还没有执行过。假设线程A执行poll操作时当前队列状态如下:

那么执行p.casItem通过CAS操作尝试设置p的item值为null,假设CAS设置成功则标记该节点并从对了中将其移除

由于p != h,所以会执行updateHead方法,假设线程A执行updateHead前另外一个线程B开始poll操作,这时候线程B的p指向head节点,但是还没有执行到18行代码:

然后线程A执行updateHead操作,执行完毕后,线程A退出

然后线程B继续执行18行代码,q = p.next,由于该节点是自引用节点,所以p == q所以会执行23行代码,跳转到goto label处,获取当前队列头head

总结:poll方法在移除一个元素时,只是简单的使用CAS操作把当前节点的item值设置为null,然后通过重新设置头结点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收的时候被回收掉。另外,如果在执行分支中发现头肩点被修改了,就要跳到外层循环重新获取新的节点

peek方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

第一次调用peek方法的时候会删除哨兵节点,并让队列中的head节点指向队列中的第一个元素,或者null。

当队列为空时

第一次执行peek的时候,null == item,然后在第6行代码出,执行q = p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者队列为null则q指向null。

这时候执行updateHead,由于h == p,所以不进行任何操作,然后peek操作会返回null。

当队列中至少有一个元素时

这时执行13行代码,p指向了q节点,然后执行代码6行

执行第7行代码时,发现item不为null,所以执行updateHead方法,由于h != p,所以设置头结点

size方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
* Additionally, if elements are added or removed during execution
* of this method, the returned result may be inaccurate. Thus,
* this method is typically not very useful in concurrent
* applications.
*
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}

在计算的时候没有加锁,所以从调用size函数到返回结果期间有可能增删元素,导致统计元素的个数不准确。

remove方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = p.casItem(item, null);
}

next = succ(p);
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}

如果队列里存在该元素则删除该元素,如果存在多个则删除第一个,并返回true,否则返回false。

contains方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}

判断队列里面是否含有指定对象,由于是遍历整个队列,所以像size操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回false。

参考资料

Java并发编程之美

ConcurrentLinkedQueue 源码分析 (基于Java 8)

0%