Worker Thread 模式

Worker Thread 模式

Worker的意思是工作的人,劳动者。在Worker Thread模式中,工人线程(worker thread)会逐个取回工作并进行处理。当所有的工作全部完成之后,工人线程会等待新的工作到来。

Worker Thread模式也被称作为Background Thread(背景线程)模式。另外,如果从“保存多个工人线程的场所”这一点看,我们也可以称这种模式为Thread Pool(线程池)模式。

示例程序

下表列举了示例程序中的所有类。示例程序的行为如下。

ClientThread类的线程会向Channel类发送工作请求(委托)

Channel类的实例雇佣了五个工人线程(WorkerThread)进行工作。所有工人线程都在等待工作请求的到来。

工人的请求到来之后,工人线程会从Channel哪里获取一项工作请求并开始工作。工作完成后,工人线程会回到Channel哪里等待下一项工作请求。

类的一览表

名字 说明
Main 测试程序行为的类
ClientThread 表示发出工作请求的线程的类
Request 可以工作请求的类
Channel 接收工作请求并将工作请求交给工人线程的类
WorkerThread 表示工人线程的类

Main类

Main类会创建一个雇佣了五个工人线程的Channel实例,并将其共享给三个ClientThread(A、B、C)。

Main类

1
2
3
4
5
6
7
8
9
10
public class Main {
public static void main(String[] args) {
Channel channel = new Channel(5);

channel.startWorkers();
new ClientThread("A", channel).start();
new ClientThread("B", channel).start();
new ClientThread("C", channel).start();
}
}

ClientThread类

ClientThread类是发送工作请求的类。“发送工作请求”这个行为对应的是示例程序中的以下处理

  1. 创建Request实例
  2. 将该实例传递给Channel类的putRequest方法

为了让程序行为有些变化,这里让程序sleep一段随机长的时间。

发送工作请求的ClientThread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Random;

public class ClientThread extends Thread {

private final Channel channel;

private static final Random random = new Random();

public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}

public void run() {
try {
for (int i = 0;true;i++) {
Request request = new Request(getName(), i);
channel.putRequest(request);
Thread.sleep(random.nextInt(1000));
}
} catch (Exception e) {
// TODO: handle exception
}
}

}

Request类

Request类是表示工作请求的类。

name字段表示发送请求的委托者的名字,number字段表示请求的编号。在示例程序中,name的值是A、B、C之一,number的只是0,1,2,3.。execute方法是负责“处理”请求的方法。

表示工作请求的Request类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Random;

public class Request {
private final String name;

private final int number;

private static final Random random = new Random();

public Request(String name, int number) {
this.name = name;
this.number = number;
}

public void execute() {
System.out.println(Thread.currentThread().getName() + " execute " + this);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String toString() {
return "[ Request from " + name + " No." + number + " ]";
}
}

Channel类

Channel类是负责传递工作请求以及保存工人线程的类。

为了传递工作请求,我们在Channel类中定义了requestQueue字段。该字段将扮演保存请求的队列的角色。putRequest方法用于将请求加入到队列中,takeRequest方法则用于取出队列中的请求。这里使用了Producer-Consumer模式。另外。为了实现putRequest方法和takeRequest方法,这里还使用了Guarded Suspension模式。

Channel勒种定义了一个用于保存工人线程的threadPool字段。threadPool是WorkerThread的数组。Channel类的构造函数会初始化threadPool字段并创建WorkerThread的实例。数组的大小由threads决定。

这里为工人线程赋予的名字分别为Worker-0,Worker-1,.。

startWorkers方法是用于启动所有工人线程的方法。

负责传递工作请求以及保存工人线程的Channel类

Channel类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Channel {
private static final int MAX_REQUEST = 100;

private final Request[] requestQueue;
private int tail;
private int head;
private int count;

private final WorkerThread[] threadPool;

public Channel(int threads) {
this.requestQueue = new Request[MAX_REQUEST];
this.tail = 0;
this.head = 0;
this.count = 0;

threadPool = new WorkerThread[threads];
for (int i = 0;i < threadPool.length;i++) {
threadPool[i] = new WorkerThread("Worker-" + i, this);
}
}

public void startWorkers() {
for (int i = 0;i < threadPool.length;i++) {
threadPool[i].start();
}
}

public synchronized void putRequest(Request request) {
while(count >= requestQueue.length) {
try {
wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}

requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}

public synchronized Request takeRequest() {
while(count <= 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Request request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}


}

WorkerThread类

WorkerThread类表示工人线程的类。

工人线程会进行工作。“进行工作”这个处理对应示例程序中的以下处理。

  1. 调用takeRequest方法从Channel的实例中获取一个Request的实例
  2. 调用Request的实例的execute方法

工人线程一旦启动后就会一直工作。也就是说,它会一直执行“获取一个新的Request,然后调用它的execute方法”的处理。

表示工人线程的WorkerThread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class WorkerThread extends Thread {
private final Channel channel;

public WorkerThread(String name, Channel channel) {
super(name);
this.channel = channel;
}

public void run() {
while (true) {
Request request = channel.takeRequest();
request.execute();
}
}
}

0%