DelayQueue源码解析

DelayQueue源码解析

DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素时最快要过期的元素。

DelayQueue使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。

另外,队列里面的元素要实现Delayed接口,一个是获取当前剩余时间的接口,一个是比较的接口。

类图

DelayQueue的类图如下所示:

DelayQueue实现了BlockingQueue接口和继承了AbstractQueue类。

1
2
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>

BlockingQueue接口只是定义了一些阻塞队列的公共方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface BlockingQueue<E> extends Queue<E> {
// 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);

// 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);

// 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

// 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

// 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;

// 获取并移除此队列的头元素,可以在指定的等待时间前等待可用的元素,timeout表明放弃之前要等待的时间长度,用 unit 的时间单位表示,如果在元素可用前超过了指定的等待时间,则返回null,当等待时可以被中断
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

// 获取队列中剩余的空间。
int remainingCapacity();

// 从队列中移除指定的值。
boolean remove(Object o);

// 判断队列中是否拥有该值。
public boolean contains(Object o);

// 将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);

// 将队列中值,全部移除,并发设置到给定的集合中。最多设置maxElements个元素
int drainTo(Collection<? super E> c, int maxElements);
}

AbstractQueue也实现了Queue接口,还提供了某些方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 从这可以发现,Add方法,其实就是调用的offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

// 调用poll方法,如果为null抛出NoSuchElementException异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 调用peek方法,如果为null,抛出NoSuchElementException异常
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 按个从队列中删除node
public void clear() {
while (poll() != null)
;
}

public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

DelayQueue中的元素需要实现Delayed接口

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Delayed extends Comparable<Delayed> {

/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
// 返回剩余时间
long getDelay(TimeUnit unit);
}

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 并发控制锁
private final transient ReentrantLock lock = new ReentrantLock();

// 存放元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();

/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
// Leader线程。用来减少线程切换和轮询开销的
private Thread leader = null;

/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();

/**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}

主要方法

无参构造函数

1
2
3
4
/**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}

创建一个空的延时阻塞队列

有参构造函数-指定集合

调用addAll方法

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

addAll

遍历集合c,调用add方法,将集合c中的每个元素加入队列之中。如果在添加的过程中集合c发生了改变,那么addAll的行为就是不可预知的。

1
2
3
4
5
6
7
8
9
10
11
public boolean addAll(Collection<? extends E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
}

add方法

调用了offer方法

1
2
3
4
5
6
7
8
9
10
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}

offer方法

  1. 获取锁
  2. 调用优先级队列的offer方法,入队
  3. 确认队首元素是否为新增元素,是的话,设置leader为null,并且唤醒一个取线程

leader的作用,要结合take方法来看,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

put方法

基于这个阻塞队列是无界的,这个方法永远不会阻塞

1
2
3
4
5
6
7
8
9
10
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}

poll方法

获取队首元素并从队列中删除队首元素。如果在过期时间之前没有元素的话,返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}

put方法

内部调用offer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never block.
*
* @param e the element to add
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public void put(E e) {
offer(e); // never need to block
}

take方法

从队列中获取并删除队首元素,如果队首元素还未过期,则阻塞当前线程,直到该元素过期为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队首元素
E first = q.peek();
// 如果队首元素为null,阻塞等待
if (first == null)
available.await();
else {
// 获取队首元素的过期时间
long delay = first.getDelay(NANOSECONDS);
// 如果已经过期
if (delay <= 0)
// 出队列
return q.poll();
first = null; // don't retain ref while waiting
// 如果leader不为null,说明已经有线程获取到了锁
if (leader != null)
// 阻塞当前线程
available.await();
else {
Thread thisThread = Thread.currentThread();
// 将当前线程设置为leader线程
leader = thisThread;
try {
// 等待元素过期
available.awaitNanos(delay);
} finally {
// 释放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 唤醒其他线程。重新竞争leader
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

Leader线程时等待头部队列元素的指定线程。Leader-Follower模式的这种变化用于最小化不必要的定时等待:

  • 当一个线程称为leader时,其会定时等待下一个delay元素过期,但是其他线程会无限期等待
  • 当从take/poll返回之前,leader线程必须signal其他等待线程,除非在此期间有线程成为了新的leader
  • 每当队列头部元素被更早到期的元素替换时,leader被置为null,offer里面q.peek == e时,会将leader置为null,此时触发signal,重新竞选leader。所以定时等待线程必须要处理失去leader时情况。

peek方法

1
2
3
4
5
6
7
8
9
10
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}

size方法

1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}

remainingCapacity方法

队列无界,因此返回最大的Integer.MAX_VALUE

1
2
3
4
5
6
7
8
9
/**
* Always returns {@code Integer.MAX_VALUE} because
* a {@code DelayQueue} is not capacity constrained.
*
* @return {@code Integer.MAX_VALUE}
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}

remove方法

从队列中删除指定的元素,有返回true,否则返回false

调用优先级队列的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
0%