Java使用CompletableFuture解析

Java使用CompletableFuture

Java提供了Future接口用来描述异步计算的结果。它可以使用isDone方法检查计算是否完成,可以使用cancel方法停止任务的执行,也可以用get方法阻塞住调用线程,直到计算完成。

但是它也存在一些问题,比如获取计算结果,只能通过阻塞或者轮询的方式获取计算的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费不必要的CPU资源,而且也不能即使地得到计算结果。

因此,JAVA 8 提供了一个包含50个方法左右的类,CompletableFuture,提供了非常强大的Future的扩展功能,它可以帮助简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。

CompletionStage代表意不计算过程中的某一阶段,一个阶段完成以后可能会触发另外一个阶段,一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println()))

创建CompletableFuture对象

CompletableFuture提供了5个公共方法可以获取到CompletableFuture对象。

它们可以分为三大类:

  • 以Async结尾并且方法签名中没有executor参数的方法:如果是supplyAsync方法,它是支持方法返回值的,runAsync方法则不支持方法返回值。它们默认的线程池是ForkJoinPool.CommonPool
  • 以Async结尾并且方法签名中有executor参数的方法:如果是supplyAsync方法,它是支持方法返回值的,runAsync方法则不支持方法返回值。使用调用方指定的线程池
  • 不以Async结尾,参数中传入的是值的方法:返回一个已经计算好的completableFuture。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

/**
* Returns a new CompletableFuture that is already completed with
* the given value.
*
* @param value the value
* @param <U> the type of the value
* @return the completed CompletableFuture
*/
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}

CompletableFuture完成操作

在CompletableFuture执行完成或者出现异常的时候,可以执行它完成特定的动作。按照有返回值和没有范围值可以分为两类:

没有返回值

主要是以下三个方法,外带一个异常处理的exceptionally方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
* function of the exception triggering this CompletableFuture's
* completion when it completes exceptionally; otherwise, if this
* CompletableFuture completes normally, then the returned
* CompletableFuture also completes normally with the same value.
* Note: More flexible versions of this functionality are
* available using methods {@code whenComplete} and {@code handle}.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture if this CompletableFuture completed
* exceptionally
* @return the new CompletableFuture
*/
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}


public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}

whenComplete* 这三个方法都是异曲同工,区别就在于是否支持异步,是否支持指定线程池。可以看看下面这个例子,如何使用whenComplete方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CompletableFutureTest {

private static int sum() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

int sum = 0;
for (int i = 1;i <= 100;i++) {
sum += i;
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return sum;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println("Result is " + v);
System.out.println("Error is " + e);
});
System.out.println(f.get());
}

运行结果:

exceptionally方法则是在方法调用出现异常之后使用,参考下面示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static int divideByZero() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

try {
int result = 1 / 0;
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return -2;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::divideByZero).exceptionally(e -> {
System.out.println(e.getMessage());
return -1;
});
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println("Value is " + v);
System.out.println("Error is " + e);
});
System.out.println(f.get());
}

构造一个除以0的异常,然后再创建CompleteFuture的时候,指定异常处理方法,demo中仅仅打印了异常信息。

我们从上述的结果中可以看到,虽然执行的时候出现了异常,但是经过exceptionally的处理,whenComplete方法还是正常成功了,只不过返回的值,是exceptionally方法中返回的。

那么把whenComplete放在exceptionally前面会怎么样呢,一起来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static int divideByZero() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

try {
int result = 1 / 0;
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return -2;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::divideByZero).whenComplete((v, e) -> {
System.out.println("Value is " + v);
System.out.println("Error is " + e);
}).exceptionally(e -> {
System.out.println(e.getMessage());
return -1;
});
System.out.println(future.get());
}

可以看到,whenComplete方法还是执行了。但是因为中间出了异常,已经无法获取到值了,不过错误信息和上面的一个不一样,不为null了。

有返回值

handle*方法作用和whenComplete方法类似,区别就是他们可以有返回值,而正是因为他们有返回值,他们可以在方法里面实现exceptionally的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}

public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}

将上面的whenComplete例子稍微改动下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static int sum() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

int sum = 0;
for (int i = 1;i <= 100;i++) {
sum += i;
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return sum;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).handle((v, e) -> {
System.out.println("Value before multiply 2 is " + v);
System.out.println("Error is " + e);
return v * 2;
}).exceptionally(e -> {
System.out.println(e.getMessage());
return -1;
});
System.out.println(future.get());
}

handle*和whenComplete*还有个不同点就是,可以在handle*里面处理异常。看下下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static int divideByZero() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

try {
int result = 1 / 0;
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return -2;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::divideByZero).handle((v, e) -> {
System.out.println("Value before multiply 2 is " + v);
if (null != e) {
System.out.println("Error is " + e);
}
return 1010;
}).exceptionally(e -> {
System.out.println(e.getMessage());
return -1;
});
System.out.println(future.get());
}

从上图的运行结果可以看出,handle在处理了异常之后,exceptionally就不会再去处理异常了,future.get()也是返回的默认的1010.

CompletableFuture组合操作

thenApply

当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenApply(sum -> {
System.out.println("Get sum is " + sum);
return sum * 2;
});
System.out.println(future.get());
}

可以看到,thenApply将前一个CompletableFuture的结果乘以2,返回了。

thenAccept

当前的任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenAccept(sum -> {
System.out.println("Get sum is " + sum);
});
System.out.println(future.get());
}

可以看到,thenAccept可以把前一个CompletableFuture的结果作为参数打印出来。

thenRun

不关心上一步的计算结果,执行下一个操作.并且不管上一步有没有返回值,它都没有返回值。

1
2
3
4
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenRun(CompletableFutureTest::sum);
System.out.println(future.get());
}

thenCombine

结合两个任务的返回值进行转化之后返回

1
2
3
4
5
6
7
8
9
10
11
12
13
private static int multiply() {
int multiply = 1;
for (int i = 1;i <= 10;i++) {
multiply *= i;
}
return multiply;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> multiplyFuture = CompletableFuture.supplyAsync(CompletableFutureTest::multiply);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenCombine(multiplyFuture, (a, b) -> a + b);
System.out.println(future.get());
}

thenAcceptBoth

结合两个任务的返回值,不需要返回,没有返回值

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> multiplyFuture = CompletableFuture.supplyAsync(CompletableFutureTest::multiply);
//CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenCombine(multiplyFuture, (a, b) -> a + b);
CompletableFuture<Void> emptyFuture = CompletableFuture.supplyAsync(CompletableFutureTest::multiply).thenAcceptBoth(multiplyFuture, (a, b) -> System.out.println("Multiply from 1 to 10 is " + a));
//System.out.println(future.get());
}

runAfterBoth

结合两个任务,不关心返回值,也不返回任何值

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> multiplyFuture = CompletableFuture.supplyAsync(CompletableFutureTest::multiply);
//CompletableFuture<Integer> future = CompletableFuture.supplyAsync(CompletableFutureTest::sum).thenCombine(multiplyFuture, (a, b) -> a + b);
CompletableFuture<Void> emptyFuture = CompletableFuture.supplyAsync(CompletableFutureTest::multiply).runAfterBoth(multiplyFuture, () -> System.out.println("do nothing use runAfterBoth"));
//System.out.println(future.get());
}

thenCompose

功能和thenApply差不多

thenApply是把泛型变量从T转为U,相当于stream中的map

thenCompose用来连接两个CompletableFuture,想到与Stream中的flatMap

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> f = future.thenCompose( i -> {
return CompletableFuture.supplyAsync(() -> {
return (i * 10) + "";
});
});

System.out.println(f.get()); //1000
}

applyToEither

执行两个CompletableFuture的结果,那个先返回就使用哪个的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private static int sumWait3() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

int sum = 0;
for (int i = 1;i <= 100;i++) {
sum += i;
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return sum;
}

private static int sumWait5() {
System.out.println("Begin to sum");

long time = System.currentTimeMillis();

int sum = 0;
for (int i = 1;i <= 200;i++) {
sum += i;
}

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End to sum cost: " + (System.currentTimeMillis() - time));
return sum;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> to100 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait3);
CompletableFuture<Integer> to200 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait5);
CompletableFuture<Integer> future = to100.applyToEither(to200, a -> {System.out.println("result is " + a);return a;});
System.out.println(future.get());
}

从上图可以看出,future返回了最先完成计算的sumWait3的结果。

acceptEither

acceptEither功能和applyToEither,只不过它们没有返回值。

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> to100 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait3);
CompletableFuture<Integer> to200 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait5);
CompletableFuture<Void> future = to100.acceptEither(to200, a -> {System.out.println("result is " + a);});
System.out.println(future.get());
}

runAfterEither

runAfterEither功能也类似,不过它不接受其他CompletableFuture的返回值,也没有返回值。

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> to100 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait3);
CompletableFuture<Integer> to200 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait5);
CompletableFuture<Void> future = to100.runAfterEither(to200, () -> {System.out.println("result is 123");});
System.out.println(future.get());
}

CompletableFuture结果获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Waits if necessary for this future to complete, and then
* returns its result.
*
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}

/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}

/**
* Returns the result value when complete, or throws an
* (unchecked) exception if completed exceptionally. To better
* conform with the use of common functional forms, if a
* computation involved in the completion of this
* CompletableFuture threw an exception, this method throws an
* (unchecked) {@link CompletionException} with the underlying
* exception as its cause.
*
* @return the result value
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}

/**
* Returns the result value (or throws any encountered exception)
* if completed, else returns the given valueIfAbsent.
*
* @param valueIfAbsent the value to return if not completed
* @return the result value, if completed, else the given valueIfAbsent
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

get方法和之前的Future.get一致

getNow方法,表示立即获取结果,如果此时计算已经完成,返回结果或者异常,否则返回给定的值valueIfAbsent

join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别

1
2
3
4
5
6
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
return 100;
});
//future.join();
future.get();

下面是执行future.get方法返回的结果

下面是执行future.join方法返回的结果

join返回一个unchecked异常(CompletionException),而get返回一个具体的异常

CompletableFuture 辅助方法

allOf

allOf方法是当所有的CompletableFuture都执行完后,执行计算

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> to100 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait3);
CompletableFuture<Integer> to200 = CompletableFuture.supplyAsync(CompletableFutureTest::sumWait5);
CompletableFuture<Void> future = CompletableFuture.allOf(to100, to200);
System.out.println(future.get());

}

运行就能发现,future会等to100和to200全部完成之后才会结束运行

anyOf

anyOf方法是当任意一个的CompletableFuture都执行完后,执行计算

运行时候能看到,在to100运行完成之后,整个CompletableFuture就完成计算,结束运行了。

参考资料

Java CompletableFuture 详解

0%