摘要:并行流与目前,我们对集合进行计算有两种方式并行流而更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待而发生阻塞。
【回顾Future接口
Future接口时java5引入的,设计初衷是对将来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。比如,你去干洗店洗衣服,店员会告诉你什么时候可以来取衣服,而不是让你一直在干洗店等待。要使用Future只需要将耗时操作封装在一个Callable对象中,再将其提交给ExecutorService就可以了。
ExecutorService executor = Executors.newFixedThreadPool(10); Futurefuture = executor.submit(new Callable () { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //当前线程等待过程中被打断 e.printStackTrace(); } catch (ExecutionException e) { //计算时出现异常 e.printStackTrace(); } catch (TimeoutException e) { //完成计算前就超时 e.printStackTrace(); }
但是Future依然有一些局限性:
无法将两个异步计算的结果合并为一个。
等待Future集合中所有任务完成。
等待Future集合中最快任务完成(选择最优的执行方案)。
通过编程的方式完成一个Future任务的执行(手工设定异步结果处理)。
应对Future的完成事件,当Future的完成事件发生时会收到通知,并可以使用Future的结果进行下一步操作,不只是简单的阻塞等待。
而CompletableFuture类实现了Future接口,可以将上述的问题全部解决。CompletableFuture与Stream的设计都遵循了类似的设计模式:使用Lambda表达式以及流水线的思想,从这个角度可以说CompletableFuture与Future的关系类似于Stream与Collection的关系。
【构建一个异步应用最佳价格查询器:查询多个线上商店对同一商品的价格。
首先构建商店对象:
package BestPriceFinder; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 异步api:使用创建CompletableFuture类提供的工厂方法与getPriceAsync()效果完全一致 * 可以更轻易的完成这个流程,并且不用担心实现细节 * @param product * @return */ public FuturegetPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 异步api: * @param product * @return */ public Future getPriceAsync(String product){ //创建CompletableFuture对象,它将包含计算结果 CompletableFuture futurePrice = new CompletableFuture<>(); //在新线程中异步计算结果 new Thread(() -> { try { double price = calculatePrice(product); //需要长时间计算的任务结束时,设置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如这里没有使用completeExceptionally,线程不会结束,调用方会永远的执行下去 futurePrice.completeExceptionally(e); } }).start(); //无需等待计算结果,直接返回future对象 return futurePrice; } /** * 同步api: * 每个商店都需要提供的查询api:根据名称返回价格; * 模拟查询数据库等一些耗时操作:使用delay()模拟这些耗时操作。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模拟耗时操作:延迟一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
下面我们针对Shop.java提供的同步方法与异步方法来进行测试:
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * 最佳价格查询器 */ public class BestFinder { Listshops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 顺序查询 */ public List findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查询 */ public List findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 异步查询 * 相比并行流的话CompletableFuture更有优势:可以对执行器配置,设置线程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守护线程保证不会阻止程序的关停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List findPricesAsync(String product){ List > priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 这里需要使用新的stream来等待所有的子线程执行完, * 因为:如果在一个stream中使用两个map: * List > priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考虑到流操作之间的延迟特性。如果你在单一的流水线中处理流,发向不同商家的请求只能以同步顺序的方式执行才会成功。因此每个创建CompletableFuture * 对象只能在前一个操作结束之后执行查询商家动作。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); } }
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api测试结果:毫无疑问是10秒之上
并行流获取同步api测试结果:也是10秒之上,但是并行流不是很高效吗?怎么会如此凄惨呢?因为这与并行流可以调用的系统核数相关,我的计算机是8核,最多8个线程同时运行。而商店有10个,也就是说,我们的两个线程会一直等待前面的某一个线程释放出空闲才能继续运行。
异步获取api测试结果:一秒左右
为何差距如此大呢?
明智的选择是创建了一个配有线程池的执行器,线程池中线程的数目取决于你的应用需要处理的负担,但是你该如何选择合适的线程数目呢?
《Java并发编程实战》中给出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C) Number : 线程数量 NCpu : 处理器核数 UCpu : 期望cpu利用率 W/C : 等待时间与计算时间比
我们这里:99%d的时间是等待商店响应 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推断出 number = 800。但是为了避免过多的线程搞死计算机,我们选择商店数与计算值中较小的一个。
【并行流与CompletableFuture目前,我们对集合进行计算有两种方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待IO而发生阻塞。
书上给出的建议如下:
如果是计算密集型的操作并且没有IO推荐stream接口,因为实现简单效率也高,如果所有的线程都是计算密集型的也就没有必要创建比核数更多的线程。
反之,如果任务涉及到IO,网络等操作:CompletableFuture灵活性更好,因为大部分线程处于等待状态,需要让他们更加忙碌,并且再逻辑中加入异常处理可以更有效的监控是什么原因触发了等待。
现在我们知道了如何用CompletableFuture提供异步的api,后面的文章会学习如何利用CompletableFuture高效的操作同步api。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68219.html
摘要:相比与其他操作系统包括其他类系统有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。因为多线程竞争锁时会引起上下文切换。减少线程的使用。很多编程语言中都有协程。所以如何避免死锁的产生,在我们使用并发编程时至关重要。 系列文章传送门: Java多线程学习(一)Java多线程入门 Java多线程学习(二)synchronized关键字(1) java多线程学习(二)syn...
摘要:因为多线程竞争锁时会引起上下文切换。减少线程的使用。举个例子如果说服务器的带宽只有,某个资源的下载速度是,系统启动个线程下载该资源并不会导致下载速度编程,所以在并发编程时,需要考虑这些资源的限制。 最近私下做一项目,一bug几日未解决,总惶恐。一日顿悟,bug不可怕,怕的是项目不存在bug,与其惧怕,何不与其刚正面。 系列文章传送门: Java多线程学习(一)Java多线程入门 Jav...
摘要:学习编程的本最佳书籍这些书涵盖了各个领域,包括核心基础知识,集合框架,多线程和并发,内部和性能调优,设计模式等。擅长解释错误及错误的原因以及如何解决简而言之,这是学习中并发和多线程的最佳书籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 来源 | 愿码(ChainDesk.CN)内容编辑 愿码Slo...
摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...
阅读 2814·2019-08-30 15:55
阅读 2860·2019-08-30 15:53
阅读 2298·2019-08-26 13:47
阅读 2560·2019-08-26 13:43
阅读 3159·2019-08-26 13:33
阅读 2807·2019-08-26 11:53
阅读 1799·2019-08-23 18:35
阅读 803·2019-08-23 17:16