摘要:这个例子想要说明两个事情中以为结尾的方法将会异步执行默认情况下即指没有传入的情况下,异步执行会使用实现,该线程池使用一个后台线程来执行任务。这个例子展示了如何使用一个固定大小的线程池来实现大写操作。
前言
这篇博客回顾JAVA8的CompletionStageAPI以及其在JAVA库中的标准实现CompletableFuture。将会通过几个例子来展示API的各种行为。
因为CompletableFuture是CompletionInterface接口的实现,所以我们首先要了解该接口的契约。它代表某个同步或异步计算的一个阶段。你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元。这意味着多个ComletionStage指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行。
除了实现了CompletionStage接口,Completion还继承了Future,这个接口用于实现一个未开始的异步事件。因为能够显式的完成Future,所以取名为CompletableFuture。
1.新建一个完成的CompletableFuture这个简单的示例中创建了一个已经完成的预先设置好结果的CompletableFuture。通常作为计算的起点阶段。
static void completedFutureExample() { CompletableFuture cf = CompletableFuture.completedFuture("message"); assertTrue(cf.isDone()); assertEquals("message", cf.getNow(null)); }
getNow方法会返回完成后的结果(这里就是message),如果还未完成,则返回传入的默认值null。
2.运行一个简单的异步stage下面的例子解释了如何创建一个异步运行Runnable的stage。
static void runAsyncExample() { CompletableFuture cf = CompletableFuture.runAsync(() -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); }); assertFalse(cf.isDone()); sleepEnough(); assertTrue(cf.isDone()); }
这个例子想要说明两个事情:
CompletableFuture中以Async为结尾的方法将会异步执行
默认情况下(即指没有传入Executor的情况下),异步执行会使用ForkJoinPool实现,该线程池使用一个后台线程来执行Runnable任务。注意这只是特定于CompletableFuture实现,其它的CompletableStage实现可以重写该默认行为。
3.将方法作用于前一个Stage下面的例子引用了第一个例子中已经完成的CompletableFuture,它将引用生成的字符串结果并将该字符串大写。
static void thenApplyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> { assertFalse(Thread.currentThread().isDaemon()); return s.toUpperCase(); }); assertEquals("MESSAGE", cf.getNow(null)); }
这里的关键词是thenApply:
then是指在当前阶段正常执行完成后(正常执行是指没有抛出异常)进行的操作。在本例中,当前阶段已经完成并得到值message。
Apply是指将一个Function作用于之前阶段得出的结果
Function是阻塞的,这意味着只有当大写操作执行完成之后才会执行getNow()方法。
4.异步的的将方法作用于前一个Stage通过在方法后面添加Async后缀,该CompletableFuture链将会异步执行(使用ForkJoinPool.commonPool())
static void thenApplyAsyncExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }使用一个自定义的Executor来异步执行该方法
异步方法的一个好处是可以提供一个Executor来执行CompletableStage。这个例子展示了如何使用一个固定大小的线程池来实现大写操作。
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }6.消费(Consume)前一个Stage的结果
如果下一个Stage接收了当前Stage的结果但是在计算中无需返回值(比如其返回值为void),那么它将使用方法thenAccept并传入一个Consumer接口。
static void thenAcceptExample() { StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture("thenAccept message") .thenAccept(s -> result.append(s)); assertTrue("Result was empty", result.length() > 0); }
Consumer将会同步执行,所以我们无需在返回的CompletableFuture上执行join操作。
7.异步执行Comsume同样,使用Asyn后缀实现:
static void thenAcceptAsyncExample() { StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message") .thenAcceptAsync(s -> result.append(s)); cf.join(); assertTrue("Result was empty", result.length() > 0); }8.计算出现异常时
我们现在来模拟一个出现异常的场景。为了简洁性,我们还是将一个字符串大写,但是我们会模拟延时进行该操作。我们会使用thenApplyAsyn(Function, Executor),第一个参数是大写转化方法,第二个参数是一个延时executor,它会延时一秒钟再将操作提交给ForkJoinPool。
static void completeExceptionallyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally")); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); try { cf.join(); fail("Should have thrown an exception"); } catch(CompletionException ex) { // just for testing assertEquals("completed exceptionally", ex.getCause().getMessage()); } assertEquals("message upon cancel", exceptionHandler.join()); }
首先,我们新建了一个已经完成并带有返回值message的CompletableFuture对象。然后我们调用thenApplyAsync方法,该方法会返回一个新的CompletableFuture。这个方法用异步的方式执行大写操作。这里还展示了如何使用delayedExecutor(timeout, timeUnit)方法来延时异步操作。
然后我们创建了一个handler stage,exceptionHandler,这个阶段会处理一切异常并返回另一个消息message upon cancel。
最后,我们显式的完成第二个阶段并抛出异常,它会导致进行大写操作的阶段抛出CompletionException。它还会触发handler阶段。
API补充:9.取消计算
CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)
返回一个新的CompletionStage,无论之前的Stage是否正常运行完毕。传入的参数包括上一个阶段的结果和抛出异常。
和计算时异常处理很相似,我们可以通过Future接口中的cancel(boolean mayInterruptIfRunning)来取消计算。
static void cancelExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); assertTrue("Was not canceled", cf.cancel(true)); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); assertEquals("canceled message", cf2.join()); }
API补充10.将Function作用于两个已完成Stage的结果之一
public CompletableFutureexceptionally(Function fn)
返回一个新的CompletableFuture,如果出现异常,则为该方法中执行的结果,否则就是正常执行的结果。
下面的例子创建了一个CompletableFuture对象并将Function作用于已完成的两个Stage中的任意一个(没有保证哪一个将会传递给Function)。这两个阶段分别如下:一个将字符串大写,另一个小写。
static void applyToEitherExample() { String original = "Message"; CompletableFuture cf1 = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)); CompletableFuture cf2 = cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> s + " from applyToEither"); assertTrue(cf2.join().endsWith(" from applyToEither")); }
public CompletableFuture applyToEitherAsync(CompletionStage extends T> other,Function super T,U> fn)11.消费两个阶段的任意一个结果
返回一个全新的CompletableFuture,包含着this或是other操作完成之后,在二者中的任意一个执行fn
和前一个例子类似,将Function替换为Consumer
static void acceptEitherExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> result.append(s).append("acceptEither")); cf.join(); assertTrue("Result was empty", result.toString().endsWith("acceptEither")); }12.在两个阶段都完成后运行Runnable
注意这里的两个Stage都是同步运行的,第一个stage将字符串转化为大写之后,第二个stage将其转化为小写。
static void runAfterBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () -> result.append("done")); assertTrue("Result was empty", result.length() > 0); }13.用Biconsumer接收两个stage的结果
Biconsumer支持同时对两个Stage的结果进行操作。
static void thenAcceptBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) -> result.append(s1 + s2)); assertEquals("MESSAGEmessage", result.toString()); }14.将Bifunction同时作用于两个阶段的结果
如果CompletableFuture想要合并两个阶段的结果并且返回值,我们可以使用方法thenCombine。这里的计算流都是同步的,所以最后的getNow()方法会获得最终结果,即大写操作和小写操作的结果的拼接。
static void thenCombineExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.getNow(null)); }15.异步将Bifunction同时作用于两个阶段的结果
和之前的例子类似,只是这里用了不同的方法:即两个阶段的操作都是异步的。那么thenCombine也会异步执行,及时它没有Async后缀。
static void thenCombineAsyncExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.join()); }16.Compose CompletableFuture
我们可以使用thenCompose来完成前两个例子中的操作。
static void thenComposeExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); assertEquals("MESSAGEmessage", cf.join()); }17.当多个阶段中有有何一个完成,即新建一个完成阶段
static void anyOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List18.当所有的阶段完成,新建一个完成阶段futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); } }); assertTrue("Result was empty", result.length() > 0); }
static void allOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List19.当所有阶段完成以后,新建一个异步完成阶段futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); assertTrue("Result was empty", result.length() > 0); }
static void allOfAsyncExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List20.真实场景futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); allOf.join(); assertTrue("Result was empty", result.length() > 0); }
下面展示了一个实践CompletableFuture的场景:
先通过调用cars()方法异步获得Car列表。它将会返回一个CompletionStage>
。cars()方法应当使用一个远程的REST端点来实现。
我们将该Stage和另一个Stage组合,另一个Stage会通过调用rating(manufactureId)来异步获取每辆车的评分。
当所有的Car对象都填入评分后,我们调用allOf()来进入最终Stage,它将在这两个阶段完成后执行
在最终Stage上使用whenComplete(),打印出车辆的评分。
cars().thenCompose(cars -> { List参考资料updatedCars = cars.stream() .map(car -> rating(car.manufacturerId).thenApply(r -> { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); } }).toCompletableFuture().join();
Java CompletableFuture 详解
Guide To CompletableFuture
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注我的微信公众号!将会不定期的发放福利哦~
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68632.html
摘要:在此基础上又向前迈进了一步局部变量类型推断允许开发人员跳过局部变量的类型声明局部变量是指在方法定义,初始化块,循环和其它的如代码块,会推断该局部变量的类型。 前言 之前面试的时候问了我是否了解JDK10的变化,一时回答不出来,所以只回答了JDK8中的函数式编程和流编程。今天看到这篇讲JAVA10的文章,顺便了解一下。 正文 JAVA10的所有新特性请参考这里。在所有的JEP中,JEP-...
摘要:有可能一个线程中的动作相对于另一个线程出现乱序。当实际输出取决于线程交错的结果时,这种情况被称为竞争条件。这里的问题在于代码块不是原子性的,而且实例的变化对别的线程不可见。这种不能同时在多个线程上执行的部分被称为关键部分。 为什么要额外写一篇文章来研究volatile呢?是因为这可能是并发中最令人困惑以及最被误解的结构。我看过不少解释volatile的博客,但是大多数要么不完整,要么难...
摘要:前言上一篇文章请参考猫头鹰的深夜翻译核心并发一安全发布发布一个对象是指该对象的引用对当前的域之外也可见比如,从方法中获取一个引用。任务的功能性接口表示一个没有返回值的任务表示一个包含返回值的计算。 前言 上一篇文章请参考猫头鹰的深夜翻译:核心JAVA并发(一) 安全发布 发布一个对象是指该对象的引用对当前的域之外也可见(比如,从getter方法中获取一个引用)。要确保一个对象被安全的发...
摘要:本文简介类概览类构造器总结类构造方法类使用举例类概览是一个实现了接口,并且键为型的哈希表。中的条目不再被正常使用时,会被自动删除。它的键值均支持。和绝大多数的集合类一样,这个类不是同步的。 本文简介 WeakHashMap类概览 WeakHashMap类构造器总结 WeakHashMap类构造方法 WeakHasjMap类使用举例 1. WeakHashMap类概览 Wea...
摘要:什么是为执行字节码提供一个运行环境。它的实现主要包含三个部分,描述实现规格的文档,具体实现和满足要求的计算机程序以及实例具体执行字节码。该类先被转化为一组字节码并放入文件中。字节码校验器通过字节码校验器检查格式并找出非法代码。 什么是Java Development Kit (JDK)? JDK通常用来开发Java应用和插件。基本上可以认为是一个软件开发环境。JDK包含Java Run...
阅读 3232·2021-11-18 10:02
阅读 3426·2021-10-11 10:58
阅读 3360·2021-09-24 09:47
阅读 1098·2021-09-22 15:21
阅读 3862·2021-09-10 11:10
阅读 3256·2021-09-03 10:28
阅读 1727·2019-08-30 15:45
阅读 2100·2019-08-30 14:22