您好,欢迎访问一九零五行业门户网

Java8的CompletableFuture的用法介绍(附示例)

本篇文章给大家带来的内容是关于java8的completablefuture的用法介绍(附示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
作为java 8 concurrency api改进而引入,本文是completablefuture类的功能和用例的介绍。同时在java 9 也有对completablefuture有一些改进,之后再进入讲解。
future计算
future异步计算很难操作,通常我们希望将任何计算逻辑视为一系列步骤。但是在异步计算的情况下,表示为回调的方法往往分散在代码中或者深深地嵌套在彼此内部。但是当我们需要处理其中一个步骤中可能发生的错误时,情况可能会变得更复杂。
futrue接口是java 5中作为异步计算而新增的,但它没有任何方法去进行计算组合或者处理可能出现的错误。
在java 8中,引入了completablefuture类。与future接口一起,它还实现了completionstage接口。此接口定义了可与其他future组合成异步计算契约。
completablefuture同时是一个组合和一个框架,具有大约50种不同的构成,结合,执行异步计算步骤和处理错误。
如此庞大的api可能会令人难以招架,下文将调一些重要的做重点介绍。
使用completablefuture作为future实现
首先,completablefuture类实现future接口,因此你可以将其用作future实现,但需要额外的完成实现逻辑。
例如,你可以使用无构参构造函数创建此类的实例,然后使用complete方法完成。消费者可以使用get方法来阻塞当前线程,直到get()结果。
在下面的示例中,我们有一个创建completablefuture实例的方法,然后在另一个线程中计算并立即返回future。
计算完成后,该方法通过将结果提供给完整方法来完成future:
public future<string> calculateasync() throws interruptedexception {    completablefuture<string> completablefuture       = new completablefuture<>();     executors.newcachedthreadpool().submit(() -> {        thread.sleep(500);        completablefuture.complete(hello);        return null;    });     return completablefuture;}
为了分离计算,我们使用了executor api ,这种创建和完成completablefuture的方法可以与任何并发包(包括原始线程)一起使用。
请注意,该calculateasync方法返回一个future实例。
我们只是调用方法,接收future实例并在我们准备阻塞结果时调用它的get方法。
另请注意,get方法抛出一些已检查的异常,即executionexception(封装计算期间发生的异常)和interruptedexception(表示执行方法的线程被中断的异常):
future<string> completablefuture = calculateasync(); // ...  string result = completablefuture.get();assertequals(hello, result);
如果你已经知道计算的结果,也可以用变成同步的方式来返回结果。
future<string> completablefuture =   completablefuture.completedfuture(hello); // ... string result = completablefuture.get();assertequals(hello, result);
作为在某些场景中,你可能希望取消future任务的执行。
假设我们没有找到结果并决定完全取消异步执行任务。这可以通过future的取消方法完成。此方法mayinterruptifrunning,但在completablefuture的情况下,它没有任何效果,因为中断不用于控制completablefuture的处理。
这是异步方法的修改版本:
public future<string> calculateasyncwithcancellation() throws interruptedexception {    completablefuture<string> completablefuture = new completablefuture<>();     executors.newcachedthreadpool().submit(() -> {        thread.sleep(500);        completablefuture.cancel(false);        return null;    });     return completablefuture;}
当我们使用future.get()方法阻塞结果时,cancel()表示取消执行,它将抛出cancellationexception:
future<string> future = calculateasyncwithcancellation();future.get(); // cancellationexception
api介绍
static方法说明
上面的代码很简单,下面介绍几个 static 方法,它们使用任务来实例化一个 completablefuture 实例。
completablefuture.runasync(runnable runnable);completablefuture.runasync(runnable runnable, executor executor);completablefuture.supplyasync(supplier<u> supplier);completablefuture.supplyasync(supplier<u> supplier, executor executor)
runasync 方法接收的是 runnable 的实例,但是它没有返回值
supplyasync 方法是jdk8函数式接口,无参数,会返回一个结果
这两个方法是 executor 的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 forkjoinpool.commonpool() 线程池中执行的。
supplyasync()使用
静态方法runasync和supplyasync允许我们相应地从runnable和supplier功能类型中创建completablefuture实例。
该runnable的接口是在线程使用旧的接口,它不允许返回值。
supplier接口是一个不具有参数,并返回参数化类型的一个值的单个方法的通用功能接口。
这允许将supplier的实例作为lambda表达式提供,该表达式执行计算并返回结果:
completablefuture<string> future  = completablefuture.supplyasync(() -> hello); // ... assertequals(hello, future.get());
thenrun()使用
在两个任务任务a,任务b中,如果既不需要任务a的值也不想在任务b中引用,那么你可以将runnable lambda 传递给thenrun()方法。在下面的示例中,在调用future.get()方法之后,我们只需在控制台中打印一行:
模板
completablefuture.runasync(() -> {}).thenrun(() -> {}); completablefuture.supplyasync(() -> resulta).thenrun(() -> {});
第一行用的是 thenrun(runnable runnable),任务 a 执行完执行 b,并且 b 不需要 a 的结果。第二行用的是 thenrun(runnable runnable),任务 a 执行完执行 b,会返回resulta,但是 b 不需要 a 的结果。实战
completablefuture<string> completablefuture   = completablefuture.supplyasync(() -> hello); completablefuture<void> future = completablefuture  .thenrun(() -> system.out.println(computation finished.)); future.get();
thenaccept()使用
在两个任务任务a,任务b中,如果你不需要在future中有返回值,则可以用 thenaccept方法接收将计算结果传递给它。最后的future.get()调用返回void类型的实例。
模板
completablefuture.runasync(() -> {}).thenaccept(resulta -> {}); completablefuture.supplyasync(() -> resulta).thenaccept(resulta -> {});
第一行中,runasync不会有返回值,第二个方法thenaccept,接收到的resulta值为null,同时任务b也不会有返回结果
第二行中,supplyasync有返回值,同时任务b不会有返回结果。
实战
completablefuture<string> completablefuture  = completablefuture.supplyasync(() -> hello); completablefuture<void> future = completablefuture  .thenaccept(s -> system.out.println(computation returned:  + s)); future.get();
thenapply()使用
在两个任务任务a,任务b中,任务b想要任务a计算的结果,可以用thenapply方法来接受一个函数实例,用它来处理结果,并返回一个future函数的返回值:
模板
completablefuture.runasync(() -> {}).thenapply(resulta -> resultb);completablefuture.supplyasync(() -> resulta).thenapply(resulta -> resulta +  resultb);
第二行用的是 thenapply(function fn),任务 a 执行完执行 b,b 需要 a 的结果,同时任务 b 有返回值。实战
completablefuture<string> completablefuture  = completablefuture.supplyasync(() -> hello); completablefuture<string> future = completablefuture  .thenapply(s -> s +  world); assertequals(hello world, future.get());
当然,多个任务的情况下,如果任务 b 后面还有任务 c,往下继续调用 .thenxxx() 即可。
thencompose()使用
接下来会有一个很有趣的设计模式;completablefuture api 的最佳场景是能够在一系列计算步骤中组合completablefuture实例。
这种组合结果本身就是completablefuture,允许进一步再续组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式。
简单说,monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。
在下面的示例中,我们使用thencompose方法按顺序组合两个futures。
请注意,此方法采用返回completablefuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个completablefuture的lambda中使用这个值:
completablefuture<string> completablefuture   = completablefuture.supplyasync(() -> hello)    .thencompose(s -> completablefuture.supplyasync(() -> s +  world)); assertequals(hello world, completablefuture.get());
该thencompose方法连同thenapply一样实现了结果的合并计算。但是他们的内部形式是不一样的,它们与java 8中可用的stream和optional类的map和flatmap方法是有着类似的设计思路在里面的。
两个方法都接收一个completablefuture并将其应用于计算结果,但thencompose(flatmap)方法接收一个函数,该函数返回相同类型的另一个completablefuture对象。此功能结构允许将这些类的实例继续进行组合计算。
thencombine()
取两个任务的结果如果要执行两个独立的任务,并对其结果执行某些操作,可以用future的thencombine方法:
模板
completablefuture<string> cfa = completablefuture.supplyasync(() -> resulta);completablefuture<string> cfb = completablefuture.supplyasync(() -> resultb);cfa.thenacceptboth(cfb, (resulta, resultb) -> {});cfa.thencombine(cfb, (resulta, resultb) -> result a + b);
实战
completablefuture<string> completablefuture   = completablefuture.supplyasync(() -> hello)    .thencombine(completablefuture.supplyasync(      () ->  world), (s1, s2) -> s1 + s2)); assertequals(hello world, completablefuture.get());
更简单的情况是,当你想要使用两个future结果时,但不需要将任何结果值进行返回时,可以用thenacceptboth,它表示后续的处理不需要返回值,而 thencombine 表示需要返回值:
completablefuture future = completablefuture.supplyasync(() -> hello)  .thenacceptboth(completablefuture.supplyasync(() ->  world),    (s1, s2) -> system.out.println(s1 + s2));
thenapply()和thencompose()之间的区别
在前面的部分中,我们展示了关于thenapply()和thencompose()的示例。这两个api都是使用的completablefuture调用,但这两个api的使用是不同的。
thenapply()
此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个completablefuture。
因此,当我们想要转换completablefuture 调用的结果时,效果是这样的  :
completablefuture<integer> finalresult = compute().thenapply(s-> s + 1);
thencompose()
该thencompose()方法类似于thenapply()在都返回一个新的计算结果。但是,thencompose()使用前一个future作为参数。它会直接使结果变新的future,而不是我们在thenapply()中到的嵌套future,而是用来连接两个completablefuture,是生成一个新的completablefuture:
completablefuture<integer> computeanother(integer i){    return completablefuture.supplyasync(() -> 10 + i);}completablefuture<integer> finalresult = compute().thencompose(this::computeanother);
因此,如果想要继续嵌套链接completablefuture  方法,那么最好使用thencompose()。
并行运行多个任务
当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。
该completablefuture.allof静态方法允许等待所有的完成任务:
api
public static completablefuture<void> allof(completablefuture<?>... cfs){...}
实战
completablefuture<string> future1    = completablefuture.supplyasync(() -> hello);completablefuture<string> future2    = completablefuture.supplyasync(() -> beautiful);completablefuture<string> future3    = completablefuture.supplyasync(() -> world); completablefuture<void> combinedfuture   = completablefuture.allof(future1, future2, future3); // ... combinedfuture.get(); asserttrue(future1.isdone());asserttrue(future2.isdone());asserttrue(future3.isdone());
请注意,completablefuture.allof()的返回类型是completablefuture <void>。这种方法的局限性在于它不会返回所有任务的综合结果。相反,你必须手动从futures获取结果。幸运的是,completablefuture.join()方法和java 8 streams api可以解决:
string combined = stream.of(future1, future2, future3)  .map(completablefuture::join)  .collect(collectors.joining( )); assertequals(hello beautiful world, combined);
completablefuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked exception。这使得它可以在stream.map()方法中用作方法引用。
异常处理
说到这里,我们顺便来说下 completablefuture 的异常处理。这里我们要介绍两个方法:
public completablefuture<t> exceptionally(function<throwable, ? extends t> fn);public <u> completionstage<u> handle(bifunction<? super t, throwable, ? extends u> fn);
看下代码
completablefuture.supplyasync(() -> resulta)    .thenapply(resulta -> resulta +  resultb)    .thenapply(resultb -> resultb +  resultc)    .thenapply(resultc -> resultc +  resultd);
上面的代码中,任务 a、b、c、d 依次执行,如果任务 a 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 c 抛出异常,那么任务 d 得不到执行。
那么我们怎么处理异常呢?看下面的代码,我们在任务 a 中抛出异常,并对其进行处理:
completablefuture<string> future = completablefuture.supplyasync(() -> {    throw new runtimeexception();})        .exceptionally(ex -> errorresulta)        .thenapply(resulta -> resulta +  resultb)        .thenapply(resultb -> resultb +  resultc)        .thenapply(resultc -> resultc +  resultd);system.out.println(future.join());
上面的代码中,任务 a 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 b。所以最终的输出结果是:
errorresulta resultb resultc resultdstring name = null; // ... completablefuture<string> completablefuture    =  completablefuture.supplyasync(() -> {      if (name == null) {          throw new runtimeexception(computation error!);      }      return hello,  + name;  })}).handle((s, t) -> s != null ? s : hello, stranger!); assertequals(hello, stranger!, completablefuture.get());
当然,它们也可以都为 null,因为如果它作用的那个 completablefuture 实例没有返回值的时候,s 就是 null。
async后缀方法
completablefuture类中的api的大多数方法都有两个带有async后缀的附加修饰。这些方法表示用于异步线程。
没有async后缀的方法使用调用线程运行下一个执行线程阶段。不带async方法使用forkjoinpool.commonpool()线程池的fork / join实现运算任务。带有async方法使用传递式的executor任务去运行。
下面附带一个案例,可以看到有thenapplyasync方法。在程序内部,线程被包装到forkjointask实例中。这样可以进一步并行化你的计算并更有效地使用系统资源。
completablefuture<string> completablefuture    = completablefuture.supplyasync(() -> hello); completablefuture<string> future = completablefuture  .thenapplyasync(s -> s +  world); assertequals(hello world, future.get());
jdk 9 completablefuture api
在java 9中,  completablefuture api通过以下更改得到了进一步增强:
新工厂方法增加了支持延迟和超时改进了对子类化的支持。引入了新的实例api:
executor defaultexecutor()completablefuture<u> newincompletefuture()completablefuture<t> copy()completionstage<t> minimalcompletionstage()completablefuture<t> completeasync(supplier<? extends t> supplier, executor executor)completablefuture<t> completeasync(supplier<? extends t> supplier)completablefuture<t> ortimeout(long timeout, timeunit unit)completablefuture<t> completeontimeout(t value, long timeout, timeunit unit)还有一些静态实用方法:
executor delayedexecutor(long delay, timeunit unit, executor executor)executor delayedexecutor(long delay, timeunit unit)<u> completionstage<u> completedstage(u value)<u> completionstage<u> failedstage(throwable ex)<u> completablefuture<u> failedfuture(throwable ex)最后,为了解决超时问题,java 9又引入了两个新功能:
ortimeout()completeontimeout()结论
在本文中,我们描述了completablefuture类的方法和典型用例。
【相关推荐:java视频教程】
以上就是java8的completablefuture的用法介绍(附示例)的详细内容。
其它类似信息

推荐信息