Java使用CompletableFuture
Java提供了Future接口用来描述异步计算的结果。它可以使用isDone方法检查计算是否完成,可以使用cancel方法停止任务的执行,也可以用get方法阻塞住调用线程,直到计算完成。
但是它也存在一些问题,比如获取计算结果,只能通过阻塞或者轮询的方式获取计算的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费不必要的CPU资源,而且也不能即使地得到计算结果。
因此,JAVA 8 提供了一个包含50个方法左右的类,CompletableFuture,提供了非常强大的Future的扩展功能,它可以帮助简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
CompletionStage代表意不计算过程中的某一阶段,一个阶段完成以后可能会触发另外一个阶段,一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println()))
创建CompletableFuture对象
CompletableFuture提供了5个公共方法可以获取到CompletableFuture对象。
它们可以分为三大类:
- 以Async结尾并且方法签名中没有executor参数的方法:如果是supplyAsync方法,它是支持方法返回值的,runAsync方法则不支持方法返回值。它们默认的线程池是ForkJoinPool.CommonPool
- 以Async结尾并且方法签名中有executor参数的方法:如果是supplyAsync方法,它是支持方法返回值的,runAsync方法则不支持方法返回值。使用调用方指定的线程池
- 不以Async结尾,参数中传入的是值的方法:返回一个已经计算好的completableFuture。
1 | /** |
CompletableFuture完成操作
在CompletableFuture执行完成或者出现异常的时候,可以执行它完成特定的动作。按照有返回值和没有范围值可以分为两类:
没有返回值
主要是以下三个方法,外带一个异常处理的exceptionally方法
1 | /** |
whenComplete* 这三个方法都是异曲同工,区别就在于是否支持异步,是否支持指定线程池。可以看看下面这个例子,如何使用whenComplete方法:
1 | public class CompletableFutureTest { |
运行结果:
exceptionally方法则是在方法调用出现异常之后使用,参考下面示例:
1 | private static int divideByZero() { |
构造一个除以0的异常,然后再创建CompleteFuture的时候,指定异常处理方法,demo中仅仅打印了异常信息。
我们从上述的结果中可以看到,虽然执行的时候出现了异常,但是经过exceptionally的处理,whenComplete方法还是正常成功了,只不过返回的值,是exceptionally方法中返回的。
那么把whenComplete放在exceptionally前面会怎么样呢,一起来看一下:
1 | private static int divideByZero() { |
可以看到,whenComplete方法还是执行了。但是因为中间出了异常,已经无法获取到值了,不过错误信息和上面的一个不一样,不为null了。
有返回值
handle*方法作用和whenComplete方法类似,区别就是他们可以有返回值,而正是因为他们有返回值,他们可以在方法里面实现exceptionally的功能。
1 | public <U> CompletableFuture<U> handle( |
将上面的whenComplete例子稍微改动下:
1 | private static int sum() { |
handle*和whenComplete*还有个不同点就是,可以在handle*里面处理异常。看下下面这个例子:
1 | private static int divideByZero() { |
从上图的运行结果可以看出,handle在处理了异常之后,exceptionally就不会再去处理异常了,future.get()也是返回的默认的1010.
CompletableFuture组合操作
thenApply
当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
可以看到,thenApply将前一个CompletableFuture的结果乘以2,返回了。
thenAccept
当前的任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值。
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
可以看到,thenAccept可以把前一个CompletableFuture的结果作为参数打印出来。
thenRun
不关心上一步的计算结果,执行下一个操作.并且不管上一步有没有返回值,它都没有返回值。
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
thenCombine
结合两个任务的返回值进行转化之后返回
1 | private static int multiply() { |
thenAcceptBoth
结合两个任务的返回值,不需要返回,没有返回值
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
runAfterBoth
结合两个任务,不关心返回值,也不返回任何值
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
thenCompose
功能和thenApply差不多
thenApply是把泛型变量从T转为U,相当于stream中的map
thenCompose用来连接两个CompletableFuture,想到与Stream中的flatMap
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
applyToEither
执行两个CompletableFuture的结果,那个先返回就使用哪个的结果
1 | private static int sumWait3() { |
从上图可以看出,future返回了最先完成计算的sumWait3的结果。
acceptEither
acceptEither功能和applyToEither,只不过它们没有返回值。
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
runAfterEither
runAfterEither功能也类似,不过它不接受其他CompletableFuture的返回值,也没有返回值。
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
CompletableFuture结果获取
1 | /** |
get方法和之前的Future.get一致
getNow方法,表示立即获取结果,如果此时计算已经完成,返回结果或者异常,否则返回给定的值valueIfAbsent
join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get
对抛出的异常的处理有些细微的区别
1 | CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { |
下面是执行future.get方法返回的结果
下面是执行future.join方法返回的结果
join返回一个unchecked异常(CompletionException),而get返回一个具体的异常
CompletableFuture 辅助方法
allOf
allOf方法是当所有的CompletableFuture都执行完后,执行计算
1 | public static void main(String[] args) throws InterruptedException, ExecutionException { |
运行就能发现,future会等to100和to200全部完成之后才会结束运行
anyOf
anyOf方法是当任意一个的CompletableFuture都执行完后,执行计算
运行时候能看到,在to100运行完成之后,整个CompletableFuture就完成计算,结束运行了。