1188. 设计有限阻塞队列

题目

实现一个拥有如下方法的线程安全有限阻塞队列:

BoundedBlockingQueue(int capacity) 构造方法初始化队列,其中capacity代表队列长度上限。
void enqueue(int element) 在队首增加一个element. 如果队列满,调用线程被阻塞直到队列非满。
int dequeue() 返回队尾元素并从队列中将其删除. 如果队列为空,调用线程被阻塞直到队列非空。
int size() 返回当前队列元素个数。

你的实现将会被多线程同时访问进行测试。每一个线程要么是一个只调用enqueue方法的生产者线程,要么是一个只调用dequeue方法的消费者线程。size方法将会在每一个测试用例之后进行调用。

请不要使用内置的有限阻塞队列实现,否则面试将不会通过。

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
输入:
1
1
["BoundedBlockingQueue","enqueue","dequeue","dequeue","enqueue","enqueue","enqueue","enqueue","dequeue"]
[[2],[1],[],[],[0],[2],[3],[4],[]]

输出:
[1,0,2,2]

解释:
生产者线程数目 = 1
消费者线程数目 = 1

BoundedBlockingQueue queue = new BoundedBlockingQueue(2); // 使用capacity = 2初始化队列。

queue.enqueue(1); // 生产者线程将1插入队列。
queue.dequeue(); // 消费者线程调用dequeue并返回1。
queue.dequeue(); // 由于队列为空,消费者线程被阻塞。
queue.enqueue(0); // 生产者线程将0插入队列。消费者线程被解除阻塞同时将0弹出队列并返回。
queue.enqueue(2); // 生产者线程将2插入队列。
queue.enqueue(3); // 生产者线程将3插入队列。
queue.enqueue(4); // 生产者线程由于队列长度已达到上限2而被阻塞。
queue.dequeue(); // 消费者线程将2从队列弹出并返回。生产者线程解除阻塞同时将4插入队列。
queue.size(); // 队列中还有2个元素。size()方法在每组测试用例最后调用。

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
输入:
3
4
["BoundedBlockingQueue","enqueue","enqueue","enqueue","dequeue","dequeue","dequeue","enqueue"]
[[3],[1],[0],[2],[],[],[],[3]]

输出:
[1,0,2,1]

解释:
生产者线程数目 = 3
消费者线程数目 = 4

BoundedBlockingQueue queue = new BoundedBlockingQueue(3); // 使用capacity = 3初始化队列。

queue.enqueue(1); // 生产者线程P1将1插入队列。
queue.enqueue(0); // 生产者线程P2将0插入队列。
queue.enqueue(2); // 生产者线程P3将2插入队列。
queue.dequeue(); // 消费者线程C1调用dequeue。
queue.dequeue(); // 消费者线程C2调用dequeue。
queue.dequeue(); // 消费者线程C3调用dequeue。
queue.enqueue(3); // 其中一个生产者线程将3插入队列。
queue.size(); // 队列中还有1个元素。

由于生产者/消费者线程的数目可能大于1,我们并不知道线程如何被操作系统调度,即使输入看上去隐含了顺序。因此任意一种输出[1,0,2]或[1,2,0]或[0,1,2]或[0,2,1]或[2,0,1]或[2,1,0]都可被接受。

解法

解法一:

同步方法块

JAVA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class BoundedBlockingQueue {
private int capacity;
private LinkedList<Integer> values;

public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
values = new LinkedList<Integer>();
}

public synchronized void enqueue(int element) throws InterruptedException {
while (this.capacity == values.size()) {
// 队列已满
this.wait();
}
values.add(element);
this.notifyAll();
}

public synchronized int dequeue() throws InterruptedException {
while (0 == values.size()) {
this.wait();
}
int value = values.removeFirst();
this.notifyAll();
return value;
}

public synchronized int size() {
return values.size();
}
}

解法二:

ReentrantLock+Condition

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBlockingQueue {


//原子类保证原子性,也可以使用volatile
//普通的int被读取,会被读入内存的缓存中,完成加减乘除后再放回内存中,而每一个线程都有自己的寄存器,这样子会导致可能读取不到最新的数据
//volatile则可以直接在主内存读写,当一个线程更新了值,其他线程能够及时获知。
AtomicInteger size = new AtomicInteger(0);
private volatile int capacity;
//自己实现阻塞队列,需要一个容器,内部实现了一个node,如果改造为不只是int的,使用T泛型
private LinkedList<Integer> container;

//可重入锁
private static ReentrantLock lock = new ReentrantLock();
Condition procuder = lock.newCondition();//用来通知生产(入队)线程等待await还是可以执行signal
Condition consumer = lock.newCondition();//用来通知消费(出队)线程等待await还是可以执行signal

public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
container = new LinkedList<>();
}

/**
* 入队
*
* @param element
* @throws InterruptedException
*/
public void enqueue(int element) throws InterruptedException {
//每一个线程都会获得锁,但是如果条件不满足则会阻塞
lock.lock();
try {
//阻塞的话必须用循环,让这个线程再次获得cpu片段的时候能够够执行
while (size.get() >= capacity) {
//入队线程阻塞,把锁释放?
procuder.await();
}
container.addFirst(element);
size.incrementAndGet();

//通知出队线程
consumer.signal();
} finally {
lock.unlock();
}
}

public int dequeue() throws InterruptedException {
lock.lock();
try {
while (size.get() == 0) {
consumer.await();
}
int lastValue = container.getLast();
container.removeLast();
size.decrementAndGet();

//通知入队线程
procuder.signal();
return lastValue;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return size.get();
} finally {
lock.unlock();
}
}
}
0%