摘要:定时器例子之前通过调用定时器,需要传一个回调,然后所有的代码逻辑都包在里面。这里定时器会阻塞在这一行,直到一秒后才会执行下面的一行。
之前介绍过quasar,如果你希望在vert.x项目里使用coroutine的话,建议使用vertx-sync。本篇将介绍vertx-sync。
本来打算另起一篇,写其他方面的东西,但是最近比较忙,就先写一篇实践方面的文章。
vertx-sync是什么上一篇我们已经讲了 Fiber 相关的知识,想必大家对Java实现类似Golang的coroutine已经有印象了,既然Java世界里有第三方提供了这么好的库,那我们就看看怎么跟 vert.x 结合起来使用。
vert.x官方为了解决异步代码编写的困难,使之更加同步化对开发人员更友好,便基于quasar包装了一个的同步库,vertx-sync,该库的作者同样也是vert.x的原作者,所以完成度还是很高的。
vertx-sync 对外只是暴露了几个简单的静态API,来完成对vert.x体系内一系列的操作包装,其实主要也就是三静态API而已。
Sync.fiberHandler 如果你希望你的handler里有一些逻辑需要在Fiber里运行,则你的handler必须用这个方法包一下。
Sync.awaitEvent 从Handler里返回一个事件结果(同步的),且 不会阻塞EventLoop
Sync.awaitResult 从Handler里返回一个异步事件结果(同步的),且 不会阻塞EventLoop
直接看介绍可能有点不直观,下面跑几个例子。
使用vertx-sync之前介绍过quasar,如果你希望在项目里使用coroutine的话,需要在JVM里设置一个参数,用于应用启动前修改字节码(注入一些中断方法),从而可以达到协程的目的。
具体方法也很简单。
-javaagent:/path/to/the/quasar-core-0.7.5-jdk8.jar
如果是基于Maven跑单元测试,那只需要引用quasar instrument的插件就可以里
com.vlkan quasar-maven-plugin 0.7.3 true true true instrument co.paralleluniverse quasar-core 0.7.5
上面是一些非常必要的准备工作,否则你无法使用quasar以及vertx-sync。
之前通过vert.x调用定时器,需要传一个回调handler,然后所有的代码逻辑都包在里面。
vertx.setTimer(1000L, h -> { System.out.println("time"s up"); });
现在我们来重新塑造一下三观。
awaitEvent(h -> vertx.setTimer(1000L, h)); System.out.println("time"s up");
这里定时器会阻塞在awaitEvent这一行,直到一秒后才会执行下面的一行。有点类似执行 Thread.sleep(1000L),但是并不会阻塞 EventLoop 因为quasar会在EventLoop基础之上再开启一个fiber。
下面我看个稍微复杂点的例子。
我们先用传统的回调方式使用vert.x的HttpClient API。
HttpClientRequest httpClientRequest = vertx.createHttpClient().get("leapcloud.cn"); httpClientRequest.handler(response -> { response.handler(responseBody -> { System.out.println(responseBody.toString()); }); }).end();
这里有两层回调嵌套,一层是得到Http的Response,另一层是从Response里得到详细的body。因为有lambda表达式才使得Java现在看起来并不是那么恶心。但是如果我们需要根据body的内容进一步做判断去继续请求其他页面,则嵌套会变的非常的深。下面尝试改造成sync方式看看。
HttpClientRequest httpClientRequest = vertx.createHttpClient().get("leapcloud.cn"); HttpClientResponse response = awaitEvent(Sync::fiberHandler); Buffer body = awaitEvent(response::handler); System.out.println(body.toString());
额,是不是感觉看着很舒服,无嵌套,直接顺序下来,非常的直观,加上Java8特有的方法引用,会让代码更精简。
写过vert.x同学肯定知道其vertx-jdbc-client为了使其兼容异步开发模型,将JDBC的底层线程池用异步方式包装了一下,也就是说JDBC层还是通过线程池去访问数据库的,但是是通过vert.x的context做了层封装,使其可以将结果放到对应的 EventLoop 里,这样比较符合vert.x的开发风格。但是带来的弊端就是嵌套太深。
final JDBCClient client = JDBCClient.createShared(vertx, new JsonObject() .put("url", "jdbc:hsqldb:mem:test?shutdown=true") .put("driver_class", "org.hsqldb.jdbcDriver") .put("max_pool_size", 30)); client.getConnection(conn -> { if (conn.failed()) { System.err.println(conn.cause().getMessage()); return; } final SQLConnection connection = conn.result(); connection.execute("create table test(id int primary key, name varchar(255))", res -> { if (res.failed()) { throw new RuntimeException(res.cause()); } // insert some test data connection.execute("insert into test values(1, "Hello")", insert -> { // query some data connection.query("select * from test", rs -> { for (JsonArray line : rs.result().getResults()) { System.out.println(line.encode()); } // and close the connection connection.close(done -> { if (done.failed()) { throw new RuntimeException(done.cause()); } }); }); }); }); });
上面代码可以是不是有点恶心呢?尝试改造一下吧。
final JDBCClient client = JDBCClient.createShared(vertx, new JsonObject() .put("url", "jdbc:hsqldb:mem:test?shutdown=true") .put("driver_class", "org.hsqldb.jdbcDriver") .put("max_pool_size", 30)); try (SQLConnection conn = awaitResult(jdbc::getConnection)) { awaitResult(h -> conn.execute("create table test(id int primary key, name varchar(255))", h)); awaitResult(h -> conn.execute("insert into test values(1, "Hello")", h)); ResultSet query = awaitResult(h -> conn.query("select * from test", h)); for (JsonArray line : query.result.getResults()) { System.out.println(line.encode()); } AsyncResult done = awaitResult(h -> conn.close(h)); if (done.failed()) { throw new RuntimeException(done.cause()) } } catch (Exception e) { e.printStackTrace(); }
除了一个try catch,其他都没有嵌套,整体逻辑的可读性非常高,完全是线性的。
你也许会发现我们似乎一直都没有用到 fiberHandler 这个静态方法,上面虽然写了定义,可能大家还是不能够理解,这里结合场景也许能更好理解。
我们尝试实现一个操作很耗时的逻辑然后包到fiber里,避免 EventLoop 被阻塞。这里你也许会很好奇,既然 Fiber 这么廉价开启10万8万的无所谓啊,恩,这里再提一下quasar的重点部分: fiber可以很廉价的被创造出来,但是他本质上还是跑在一个线程上面,如果其中一个fiber执行了非常耗时的操作,则后面的fiber会一直等待,从而造成整个线程阻塞。 也就是说一个fiber不能执行非常耗时的操作,比如计算100万以内的素数之和,对于这种操作,我们可以通过直接将逻辑放到vert.x的worker线程里多带带去跑,然后通过fiber包装一下就可以了。
AsyncResultresult = awaitResult(fiberHandler(h -> vertx.executeBlocking((Handler >) event -> { //求百万以内素数之和,这里的逻辑会在vert.x的worker线程里跑。随便耗时多久,都不会阻塞EventLoop long sum = sumOfPrime(1, 000, 000); event.complete(sum); }, h))); //打印结果 System.out.println(result.result());
这里你会注意到 awaitReslt 里用了 fiberHandler ,因为executeBlocking里的 handler 逻辑本身并没有跑在fiber体系下,所以会导致无效,而fiberHandler的作用就是将一段vert.x的handler包到 fiber 里。使之后续的await可以将其结果返回,这里使用awaitResult返回结果。
我们再深入一点看看 fiberHandler 方法里到底干了什么。
@Suspendable public staticHandler fiberHandler(Handler handler) { FiberScheduler scheduler = getContextScheduler(); return p -> new Fiber (scheduler, () -> handler.handle(p)).start(); }
这里获取Fiber的调度器,然后直接new了一个 Fiber ,避免了我们自己对逻辑做Fiber包装。是不是很简单呢。
总结相比较传统的回调Handler,vertx-sync的包装十分优雅,基本可以恢复到同步方法调用级别,很好的减轻了异步回调带来的心智负担。
但是这个毕竟不是JVM级别的实现,所以或多或少还是有点门槛的,比如部署的时候,需要通过设置JVM参数来修改部分字节码,同时还要注意一些需要挂起的方法上面加注释或者强行让其抛出可中断异常。个人建议在一些不重要的工具级项目里使用,非常重要的项目不推荐使用,当然了如果你觉得你的业务只需要依赖vert.x那么强烈你推荐你使用,只要记得打开 BlockingChecker 就好,可以即时的发现潜在的阻塞逻辑。
责编:另外7.24号,我们力谱宿云携手Vert.x中国用户组在太库·上海的赞助下举办了一场关于Vert.x的技术,之后还会有系列活动,有兴趣的同学们可以关注我们的微信公众号:MaxLeap_yidongyanfa,了解更多资讯。
作者往期佳作
次时代Java编程(一) Java里的协程
作者信息
本文系力谱宿云 LeapCloud旗下MaxLeap团队_UX成员:刘小溪 【原创】
力谱宿云首发地址:https://blog.maxleap.cn/archi...
刘小溪,Maxleap的高级开发工程师,喜欢倒腾一些有意思的技术框架,对新的技术以及语言非常有兴趣,以前在shopex担任架构师,目前在Maxleap负责基础架构以及服务框架这块技术,同时也会对Vert.x的社区提供一些开源上的支持。
关于MaxLeap
MaxLeap 是力谱宿云推出,为移动应用开发、运营提供一站式后端云服务, 包括应用开发所需的后端云数据库、云数据源、云代码、云容器、 IM、移动支付、应用内社交、第三方登录、社交分享等基础服务,以及针对应用运营的数据分析、推送营销,用户支持等服务, 覆盖移动应用的研发、运营完整生命周期。
MaxLeap 致力于让移动应用开发运营更快速简单。
官网:https://maxleap.cn/
欢迎扫二维码,关注我们
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/64975.html
摘要:用基于快速实现一个最简单的代理服务器,只需要分钟时间。监控统计客户端的请求情况,请求分布统计请求类型等,以此来优化数据库的使用。 用java8基于vert.x3 快速实现一个最简单的mysql代理服务器,只需要5分钟时间。 showImg(/img/bVz0vh); 什么是mysql 代理? mysql代理是介于client端和mysql服务端中间层服务,如下图所示: showImg(...
摘要:为此我们可以使用来发送一个比如流,会自动为我们实现管道传输数据。因为流的长度不确定,请求将使用即分块传输。方法可以被多次安全的调用,这对于无数据的请求很容易复用配置和。如此在接受响应的回调函数里可以直接得到设置的响应解码体。 本文主要介绍Vert.x 3.4.x 版本新组件Web Client的使用 showImg(https://segmentfault.com/img/bVL8iX...
摘要:为什么我会说它们是一样的简单思考一下我的后端书架后端掘金我的后端书架月前本书架主要针对后端开发与架构。一方案调研版本选择当前主流版本是和应用的后台运行配置后端掘金酱油一篇,整理一下关于后台运行的一些配置方式。 分享 50 个完整的 React Native 项目 - 掘金本文为 Marno 原创,转载必须保留出处! 公众号 aMarno,关注后回复 RN 加入交流群 简书专题《 Rea...
阅读 2987·2021-10-19 11:46
阅读 981·2021-08-03 14:03
阅读 2937·2021-06-11 18:08
阅读 2906·2019-08-29 13:52
阅读 2746·2019-08-29 12:49
阅读 481·2019-08-26 13:56
阅读 925·2019-08-26 13:41
阅读 850·2019-08-26 13:35