Producer-Consumer 模式

Producer-Consumer 模式

Producer是“生产者”的意思,指的是生成数据的线程。Consumer则是“消费者”的意思,指的是使用数据的线程。

生产者安全地将数据交给消费者。虽然仅是这样看似简单的操作,但当生产者和消费者以不同的线程运行时,两者之间的处理速度差异便会引起问题。例如,消费者想要获取数据,可数据还没有生成,或者生产者想要交付数据,而消费者的状态还无法接受数据等。

Producer-Consumer模式在生产者和消费者之间加入了一个“桥梁”角色。该桥梁角色用于消除线程间处理速度的差异。

一般来说,在该模式中,生产者和消费者都有多个,当然,生产者和消费者有时也会只有一个。当两者都只有一个时,称之为PIPE模式。

示例程序

在这个示例程序中,有3位糕点师制作蛋糕并将其放到桌子上,然后有三位客人来吃这些蛋糕。程序运行如下所示:

  • 糕点师(MakerThread)制作蛋糕(String)。并将其放置到桌子上(Table)上。
  • 桌上最多可以防止3个蛋糕
  • 如果桌子上已经放满3个蛋糕时糕点师还要再放置蛋糕,必须等到桌子上空出位置。
  • 客人(EaterThread)取桌子上的蛋糕吃
  • 客人按蛋糕被放置到桌子上的顺序来取蛋糕
  • 当桌子上1个蛋糕都没有时,客人若要取蛋糕,必须等到桌子上新放置了蛋糕。

类的一览图

名字 说明
MakerThread 表示糕点师的类
EaterThread 表示客人的类
Table 表示桌子的类
Main 测试程序行为的类

Main类

Main类会创建一个桌子的实例,并启动表示糕点师和客人的线程。MakerThread和EaterThread的构造函数中传入的数字只是用来作为随机数的种子,数值本身并没有什么特别的意义。

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args) {
Table table = new Table(3);
new MakerThread("M1", table, 78548).start();
new MakerThread("M2", table, 31415).start();
new MakerThread("M3", table, 48521).start();
new EaterThread("E1", table, 78541).start();
new EaterThread("E2", table, 78214).start();
new EaterThread("E3", table, 12532).start();
}
}

MakerThread类

MakerThread类用于制作蛋糕,并将其放置在桌子上,也就是糕点师。为了简单期起见,我们像下面这样以“流水号”和制作该蛋糕的“线程名称”来表示蛋糕。
{ Cake No.123 by MakerThread-1 }
流水号 线程名称

为了使程序的运行结果方便查看,蛋糕的流水号在所有的糕点师质检室公用的。为此,这里将流水号(id)声明了静态字段。

MakeThread会先暂停一段随机长(0-1000毫秒之间)的时间,然后再调用Table类的put方法将制作好的蛋糕放置到桌子上。暂停的这段时间模拟的是“制作蛋糕所花费的时间”。

MakeThread无限循环执行“制作蛋糕->放置到桌子上”,是蛋糕的生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Random;

public class MakerThread extends Thread {
private final Random random;

private final Table table;

private static int id = 0;

public MakerThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}

public void run() {
while (true) {
try {
Thread.sleep(random.nextInt(1000));
String cake = "[ Cake No." + nextId() + "by " + getName() + " ]";
table.put(cake);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

private static synchronized int nextId() {
return id++;
}

}

EaterThread类

EaterThread类用于表示从桌子上取蛋糕吃的人。

客人通过Table类的Take方法取桌子上的蛋糕。然后,与MakerThread类一样,EaterThread也会暂停一段随机长的时间。

EaterThread无限循环执行“从桌子上取蛋糕->吃蛋糕”,是蛋糕的消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.Random;

public class EaterThread extends Thread {
private final Random random;

private final Table table;

public EaterThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}

public void run() {
while (true) {
try {
String cake = table.take();
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

Table 类

Table类用于表示放置蛋糕的桌子。

可放置的蛋糕个数通过构造函数来制定。

在示例程序中,蛋糕以String实例来表示。Table类声明了一个String数组类型的buffer字段,用于作为蛋糕的实际放置位置。

为了正确放置(put)和取(take)蛋糕,table类还声明了int类型的字段tail、head和count。各字段的含义分别如下所示。

  • tail字段:表示下一次放置(put)蛋糕的位置
  • head字段:表示下一次取(take)蛋糕的位置
  • count字段:表示当前桌子上放置的蛋糕的个数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Table {
private final String[] buffer;

private int tail;

private int head;

private int count;

public Table(int count) {
this.buffer = new String[count];
this.head = 0;
this.tail = 0;
this.count = 0;
}

public synchronized void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
while (count >= buffer.length) {
wait();
}
buffer[tail] = cake;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();
}

public synchronized String take() throws InterruptedException {
while (count <= 0) {
wait();
}

String cake = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();
System.out.println(Thread.currentThread().getName() + " takes " + cake);
return cake;
}
}

运行结果示例

运行结果示例

0%