Java使用Future、Callable和FutureTask

Java使用Future、Callable和FutureTask

在使用多线程的时候,可以继承Thread类或者实现Runnable接口,这样就可以启动一个线程并运行了。但是这种模式有个问题,就是无法获取到线程调用的结果。要获取结果的话必须使用共享变量或者线程通信的方式来达到效果,这样使用起来比较麻烦。于是JDK就从1.5之后引入了Future和Callable,来在线程执行完成之后,获得线程执行的结果。

这里,我们先研究分析一下Future、Callable以及衍生出的FutureTask。

Future示例

先看一个使用Future的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FutureTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = (ExecutorService) Executors.newSingleThreadExecutor();

Future<String> result = threadPool.submit(() -> {
Thread.sleep(3000);
return "Hello World";
});

long time = System.currentTimeMillis();
System.out.println(result.get());
System.out.println("Time costed: " + (System.currentTimeMillis() - time));

threadPool.shutdown();
}
}

首先新建了一个只有一个线程的线程池,而后向线程池提交了一个任务,任务的返回值是一个Future<String>对象。

任务执行的内容很简单,就是等待3秒,返回字符串“Hello World”。

最后打印返回的结果和耗时,然后关闭线程池。下面是运行结果:

从图中可以看到,到出结果之前一共耗费了3秒左右,那是因为,在Future对应的线程没有完成之前,调用Future.get的话,会被阻塞住,直到线程运行结束,有结果返回位置。

Future接口

Future是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
// 取消当前正在执行的任务,如果当前任务已经完成,或者已经被取消,或者因为某些原因不能被取消,就会返回失败
// 如果在调用了本方法之后,线程还没有启动,那么线程就不需要启动了
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
// 任务正常完成之前被取消了,返回true
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
// 任务完成返回true
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
// 获取任务结果,如果获取的时候任务没有结束,阻塞等待
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
// 获取任务结果,如果获取的时候任务没有结束,等待特定的时间。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Callable示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Sum implements Callable<Integer> {

@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1;i <= 100;i++) {
sum += i;
}

Thread.sleep(3000);
return sum;
}

}

public class CallableTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
long time = System.currentTimeMillis();
Future<Integer> result = threadPool.submit(new Sum());
System.out.println("result is " + result.get());
System.out.println("Time cost: " + (System.currentTimeMillis() - time));
threadPool.shutdown();
}
}

首先声明一个类,它需要继承Callable接口,并实现它的call方法。从代码中可以看到,call方法是有返回值的。

接着,我们在main函数中,声明了一个只有一个线程的线程池,提交一个新的Callable任务。用Future对象去获得它的运行结果。

最后打印结果,耗时和关闭线程池。

Callable接口

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Callable接口就一个方法call,返回一个计算值。

FutureTask示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FutureTaskSum implements Callable<Integer> {

@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1;i <= 100;i++) {
sum += i;
}

Thread.sleep(3000);
return sum;
}

}

public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new FutureTaskSum()) {

@Override
protected void done() {
try {
System.out.println("Run finished: " + get());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

};

ExecutorService threadPool = Executors.newSingleThreadExecutor();
threadPool.submit(futureTask);
threadPool.shutdown();

}
}

首先,将Callable中的例子Sum换了个名字

接着再main类中定义一个FutureTask<Integer>变量,并重写它的done方法

然后,新建一个单线程的线程池,提交这个FutureTask变量执行。

FutureTask类

FutureTask实现了RunnableFuture接口,而RunnableFutrue就是Runnable和Future的组合接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算结果。

0%