LinkedBlockingQueue源码解析

LinkedBlockingQueue源码解析

ConcurrentLinkedQueue是通过CAS实现的一个队列,下面我们研究下通过ReentrantLock实现的队列-LinkedBlockingQueue。LinkedBlockingQueue也是使用单向链表实现的,它也有两个Node,分别为head,tail,用来存放首尾节点,并且还有一个初始值为0的原子变量count,用来记录队列元素个数。

另外,LinkedBlockingQueue还有两个ReentrantLock实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列头获取元素,putLock控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。

notEmpty和notFull是条件变量,它们内部都有一个条件队列用来存放入队和出队时被阻塞的线程。

类图

LinkedBlockingQueue的类图如下所示:

ConcurrentLinkedQueue实现了BlockingQueue接口和继承了AbstractQueue类。

1
2
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

BlockingQueue接口只是定义了一些阻塞队列的公共方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface BlockingQueue<E> extends Queue<E> {
// 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);

// 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);

// 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

// 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

// 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;

// 获取并移除此队列的头元素,可以在指定的等待时间前等待可用的元素,timeout表明放弃之前要等待的时间长度,用 unit 的时间单位表示,如果在元素可用前超过了指定的等待时间,则返回null,当等待时可以被中断
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

// 获取队列中剩余的空间。
int remainingCapacity();

// 从队列中移除指定的值。
boolean remove(Object o);

// 判断队列中是否拥有该值。
public boolean contains(Object o);

// 将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);

// 将队列中值,全部移除,并发设置到给定的集合中。最多设置maxElements个元素
int drainTo(Collection<? super E> c, int maxElements);
}

AbstractQueue也实现了Queue接口,还提供了某些方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 从这可以发现,Add方法,其实就是调用的offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

// 调用poll方法,如果为null抛出NoSuchElementException异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 调用peek方法,如果为null,抛出NoSuchElementException异常
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 按个从队列中删除node
public void clear() {
while (poll() != null)
;
}

public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/** The capacity bound, or Integer.MAX_VALUE if none */
// 队列容量
private final int capacity;

/** Current number of elements */
// 当前队列大小
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
// 队列头
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 队列尾
private transient Node<E> last;

/** Lock held by take, poll, etc */
// 取锁
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
// 取线程的等待队列
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
// 插入锁
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
// 插入线程的等待队列
private final Condition notFull = putLock.newCondition();

主要方法

无参构造函数

1
2
3
4
5
6
7
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

生成一个最大为Integer.MAX_VALUE大小的队列,并且生成一个新的值为null的结点,并将head和tail都指向这个null结点。

有参构造函数-指定大小

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

生成一个指定大小的队列,并且生成一个新的值为null的结点,并将head和tail都指向这个null结点。

有参构造函数-指定集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

将队列大小初始化为Integer.MAX_VALUE,申请插入锁,入队列

size方法

1
2
3
4
5
6
7
8
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size() {
return count.get();
}

返回原子变量的值

remainingCapacity方法

返回容量减去当前大小的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
return capacity - count.get();
}

put方法

将元素插入到队列的末尾,如果队列满的话,阻塞当前插入线程,直到有空间插入时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 不支持null 元素插入
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
// 获取插入锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果队列已经满了,阻塞当前线程,加入notFull等待队列
while (count.get() == capacity) {
notFull.await();
}
// 入队列
enqueue(node);
// 队列自增
c = count.getAndIncrement();
// 再次检查,如果支持再新增一个元素,唤醒一个插入线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果c为0,说明队列里面还有一个数据,c初始化为-1,唤醒取线程
if (c == 0)
signalNotEmpty();
}

offer方法

添加一个元素到列表的末尾,如果队列空间满了,则丢弃当前元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// null元素禁止插入
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果队列满了,丢弃元素直接返回
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 获取插入锁
putLock.lock();
try {
// 如果未达到容量,入队列
if (count.get() < capacity) {
enqueue(node);
// 大小自增1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果队列未满,激活一个插入线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 队列里面至少还有一个元素,激活一个取线程
signalNotEmpty();
return c >= 0;
}

take方法

从队列中获取一个元素,并从队列中删除该元素,如果队列为空,阻塞当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取取锁
takeLock.lockInterruptibly();
try {
// 如果当前大小为0,阻塞当前线程
while (count.get() == 0) {
notEmpty.await();
}
// 从队列中去一个元素
x = dequeue();
// 大小自减
c = count.getAndDecrement();
// 如果队列里还有元素
if (c > 1)
// 唤醒其他取线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列中还可以再插入数据
if (c == capacity)
// 唤醒一个插入线程
signalNotFull();
return x;
}

poll方法

从队列中获取一个元素,并从队列中删除该元素,如果队列为空,直接返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

peek方法

从队列中获取一个元素,如果队列为空,直接返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

remove方法

从队列中删除指定的元素,有返回true,否则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
// 为null直接返回false
if (o == null) return false;
// 锁住取锁和插入锁
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 找到对应的节点
if (o.equals(p.item)) {
// 删除
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 解锁插入锁和取锁
fullyUnlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}

contains方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
// 锁住插入锁和取锁
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
0%