摘要:今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。值检查,整个设计中均没有对对象做的检查,容易引起。
Netty中的异步调用
如果大家观察仔细,会发现我们之前所写的代码都是串行执行的,这是什么意思?就是我们看到代码是什么顺序,最后程序就是按什么顺序执行的。
但是Netty作为一个高性能网络框架,他的调用很多都是异步的,这样,就可以不等上一步做完,继续行进下一步,达到多任务并行的作用。
实现概述Netty是怎么实现他的异步调用呢,大致总结了下由以下几个核心部分
组成:
异步执行(executor)
异步结果(future and promise)
Listener
同步接口
首先,既然是异步调用,肯定要有异步执行,同学们这里肯定想到的是使用线程,没错,他的底层确实也是线程,只不过netty自身封装成了executor,增强了线程的调度。
其次,是要能获取到这次执行的结果,有的同学可能会说使用callable,没错这确实是一种解决方案,但是netty并没有使用这种,而是使用了一种更为巧妙的设计(也就是通过promise对象来传递执行的结果)来完成这种操作,下面我们会详细说明。
最后就是promise对象提供的各种接口,比如Listener:可以监听执行的完成。或者是同步接口:保证异步执行的方法顺序也是同步的。这篇文章中,我们主要就讲这两,三个,其他的各位童鞋可以自己去看源码。
Executor实现Netty中每个Channel都有一个eventloop对象,实现还蛮复杂的,在这里不是重点,所以我们先实现一个,具有异步调用功能的exector。
自定义executor很简单,只要实现Executor接口就行
public class MyNettyExecutor implements Executor { private ThreadFactory factory; public MyNettyExecutor(ThreadFactory factory) { this.factory = factory; } public void execute(Runnable command) { factory.newThread(command).start(); } }
然后在需要使用的时候,实例化这个类,这里为了增强使用,我们在类内部提供一个静态初始化方法,并提供最简单factory实现。
public static Executor newExecutor(){ return new MyNettyExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); }Promise详解 对furture/promise的理解
我对future的认识最开始源于Java的FutureTask框架,简单来说,FutureTask是Future接口的一种实现,Future则是异步执行的结果。
而promise,从接口注释上来看,是一种可修改的Future
/** * Special {@link Future} which is writable. */
那么现在来看,一个异步结果的程序主要有下面几步
生成promise对象
具体调用的地方传入promise参数
异步调用完成后,设置promise为完成
返回future对象
其中,第三步是发生在异步调用里的,所以我们看到的顺序其实就是1->2>4,让我们来画一张图。
这其实可以用一个现实中的例子来讲述。
今天是小明女朋友的生日,小明想给她一个惊喜,于是想到了订一个蛋糕给她,所以小明打电话给蛋糕店预定,店员回复他说:好的,我们知道了,制作好了会通知你的。于是小明就开开心心的打游戏去了。
在上面的例子中,预定蛋糕就是一个异步过程,我只要通知需要做这件事的人(execute),并拿到回复(Future),然后就可以做其他事情了。然后过一段时间打电话询问蛋糕做好没(isDone),如果没做好,那就请他做好的时候通知我(listener)
所以现在我们有了异步执行,还需要什么呢?
Future和Promise的定义接口
Promise实现
然后,我们理一下需要哪些接口
isDone 判断任务是否完成
addListener
trySuccess 设置任务完成并通知所有listener
sync 同步方法,等待任务完成
定义首先定义接口
/*listener接口,提供complete方法**/ public interface MyFutureListenerisDone> extends EventListener { void operationComplete(F future); } /*Future接口**/ public interface MyFuture { boolean isDone(); MyFuture sync() throws InterruptedException ; MyFuture addListener(MyFutureListener extends MyFuture super V>> listener); } /*promise接口**/ public interface MyPromise extends MyFuture { boolean trySuccess(); @Override MyPromise addListener(MyFutureListener extends MyFuture super V>> listener); }
我们假设只有完成和未完成两个状态,Promise内维护着这个状态值(初始为null),那么判断是否完成只需要判断这个值不为空就行了。
private volatile Object result = null; @Override public boolean isDone() { return result != null; }trySucess
那么最简单的success实现就是给这个对象赋值
@Override public boolean trySuccess() { result = new Object(); return true; }
当然,这里很不严谨,我们后面再说。
Listener接口实现上面我们定义了listener接口,这里要实现addListener方法
private List>> listeners; @Override public MyPromise addListener(MyFutureListener extends MyFuture super Void>> listener) { synchronized (this) { if(listeners == null){ listeners = new ArrayList >>(); listeners.add(listener); }else { listeners.add(listener); } } if (isDone()){ for (MyFutureListener f: listeners ) { f.operationComplete(this); } } return this; }
然后完善下success方法,成功的时候调用每一个listener的complete方法。
@Override public boolean trySuccess() { result = new Object(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }同步接口实现
同步也很简单,就是先判断任务是否完成,没有完成就wait一下。注意,wait之前我们要保持同步,引入synchronized原语。
@Override public MyFuturesync() throws InterruptedException { if (isDone()){ return this; } synchronized (this){ while (!isDone()) { waiters++; try { wait(); }finally { waiters--; } } } return this; }
同理,需要有地方去唤醒它,我们继续完善success方法,最终我们的trySuccess方法如下
private synchronized void checkNotify(){ if (waiters > 0){ notifyAll(); } } @Override public boolean trySuccess() { result = new Object(); checkNotify(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }Demo
轮子造好了,是时候写个demo测试一下
public class MyExecutorDemo { public static void main(String[] args) { MyFuture future = asyncHello().addListener((MyFutureListener警告>) future1 -> System.out.println("监听到完成")); if (future.isDone()){ System.out.println("异步执行完成"); }else{ try { future.sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } static MyFuture asyncHello(){ Executor executor = MyNettyExecutor.newExecutor(); final DefaultPromise promise = new DefaultPromise(); executor.execute(() -> { System.out.println("Hello Async"); try { //模拟一些操作 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } promise.trySuccess(); }); return promise; } }
不可用于生产,这个Future/promise的设计仅仅为了说明异步执行和结果,距离netty中的异步框架还缺少很多。
NULL值检查,整个设计中均没有对对象做NULL的检查,容易引起NullPointException。
异常处理缺失,对可能失败的地方做异常处理(这也是是否能用于生产的合格检验)
非完全异步,listener的通知没有使用异步
待补充(以我现在的水平,暂时想不到)
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68619.html
摘要:我想这很好的解释了中,仅仅一个都这么复杂,在单线程或者说串行的程序中,编程往往是很简单的,说白了就是调用,调用,调用然后返回。 Netty源码分析(三) 前提概要 这次停更很久了,原因是中途迷茫了一段时间,不过最近调整过来了。不过有点要说下,前几天和业内某个大佬聊天,收获很多,所以这篇博文和之前也会不太一样,我们会先从如果是我自己去实现这个功能需要怎么做开始,然后去看netty源码,与...
摘要:下面无耻的贴点源码。启动类我们也学,把启动类抽象成两层,方便以后写客户端。别着急,我们慢慢来,下一篇我们会了解以及他的成员,然后,完善我们的程序,增加其接收数据的能力。文章的源码我会同步更新到我的上,欢迎大家,哈哈。 废话两句 这次更新拖了很长时间,第一是自己生病了,第二是因为最开始这篇想写的很大,然后构思了很久,发现不太合适把很多东西写在一起,所以做了点拆分,准备国庆前完成这篇博客。...
摘要:一些想法这个系列想开很久了,自己使用也有一段时间了,利用也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 一些想法 这个系列想开很久了,自己使用netty也有一段时间了,利用netty也编写了一个简单的框架,并运用到工作中了,感觉还不错,趁着这段时间工作不是很忙,来分析一波源码,提升下技术硬实力。 结构 这里先看下net...
摘要:在上一篇源码实战系列三全剖析中,我们详细分析了的初始化过程,并得出了如下结论在中,每一个都有一个对象,并且其内部本质上就是一个双向链表本篇我们将深入源码内部,对其一探究竟,给大家一个全方位解析。 在上一篇《Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析》中,我们详细分析了NioServerSocketChannel的初始化过程,并得出了如下结论...
摘要:对于,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。 在上一篇《ServerBootstrap 与 Bootstrap 初探》中,我们已经初步的了解了ServerBootstrap是netty进行服务端开发的引导类。 且在上一篇的服务端示例中,我们也看到了,在使用netty进行网络编程时,我...
阅读 2528·2021-11-23 09:51
阅读 3334·2021-11-22 15:22
阅读 1851·2021-11-18 13:22
阅读 2195·2021-09-24 09:48
阅读 1285·2019-08-29 13:58
阅读 1276·2019-08-26 13:39
阅读 2397·2019-08-26 10:48
阅读 3016·2019-08-26 10:21