资讯专栏INFORMATION COLUMN

Netty AUTO_READ读数据流程分析

fevin / 686人阅读

摘要:用这种方式很好的规避了多线程所带来的问题,很值得我们借鉴那么这个怎么来的呢看一下的方法如果为,就返回。

Netty channelRegisteredChannelActive---源码分析
经过下面的分析我们可以了解netty读数据的一个过程,以及为什么channelActive方法、channelReadComplete方法会回调ChannelOutboundHandler的read方法。

1.read()方法追溯

上文说到在HeadContext的channelActive方法中会调用readIfIsAutoRead();该方法同样会在HeadContext的channelReadComplete(xxx)中调用。 readIfIsAutoRead();源码如下:

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
       channel.read();
    }
}

channel.config().isAutoRead()可以通过ChannelOption.AUTO_READ设置。如果设置为false,那么channel便不会主动读数据,除非显示的调用ChannelHandlerContext的read()

AbstractChannel的read()如下

    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

在read()方法中调用了pipeline的read()方法

DefaultChannelPipeline的read()方法

    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }
2.TailContext

那么接下来的重点就是DefaultChannelPipeline的tail及tail.read()方法了。先看一下tail对应的TailContext类,TailContext是DefaultChannelPipeline的内部类

DefaultChannelPipeline

   final AbstractChannelHandlerContext head;
   final AbstractChannelHandlerContext tail;

   ...省略代码...

   protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

TailContext

    // A special catch-all handler that handles both bytes and messages.
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }
     ...省略代码...   

TailContext继承自AbstractChannelHandlerContext,同时实现了ChannelInboundHandler,也是多重身份。
TailContext的read()方法是继承自AbstractChannelHandlerContext,TailContext没有重写。

AbstractChannleHandlerContext的read()如下:

    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }
    
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

其中findContextOutbound()是找到下一个Outbound的ChannelHandlerContext。那么由tail.read()所代表的含义便是从pipeline中的尾部的最后一个ChannelInboundHandler开始往前查找是Outbound的HandlerContext.
然后该HandlerContext的invokeRead()方法被调用。

3.简单讲解executor.inEventLoop()

以下分析和read过程没多大关系也可以跳过
AbstractChannleHandlerContext的read()方法中的

       if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

AbstractEventExecutor的inEventLoop()

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

上面代码的含义是如果调用ChannelHandlerContext read() 所在的线程和executor是同一个线程,那么直接执行AbstractChannelHandlerContext的invokeRead()方法,否则封装成任务,放到executor的任务队列,去等待执行。 这种类似的代码在netty中很常见,这是netty中不用考虑多线程问题的原因。netty用这种方式很好的规避了多线程所带来的问题,很值得我们借鉴

那么这个executor怎么来的呢?看一下AbstractChannelHandlerContext的executor()方法

    @Override
    public EventExecutor executor() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }

如果executor 为null,就返回channel().eventLoop()。这里channel().eventLoop()就是每个channel所对应的EventLoop,专门用来处理IO事件,因此不能被阻塞,不能执行耗时任务,该eventLoop会在channel创建时会和channel绑定,ChannelInboundHandler的channelRegistered()也就会被回调。我们创建ServerBootstrap是会指定一个WokerGroup例如NioEventLoopGroup,那么这个eventLoop便会是其中的一员。

那如果executor不为null,executor是怎么来的呢?

AbstractChannelHandlerContext的构造方法

   AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

executor是通过构造方法传进来的。pipeline在添加handler时可以指定EventExecutorGroup(可以查看ChannelPipeline接口的API),便是这么传进来的,具体的分析过程此处略去(可查看netty 耗时任务如何处理去查看具体分析过程),因为不是此篇文章的重点。
这样我们就能处理耗时任务,而不阻塞IO线程了。

4.ChannelHandlerContext的read()在pipeline的传递

第2小节分析到AbstractChannelHandlerContext的invokeRead()方法会被调用,那么invokeRead()实现了什么功能?

   private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }

该方法所表达的含义很简单就是回调ChannelOutboundHandler的read(xxx)方法。如果我们的自定义的ChannelOutboundHandler继承自ChannelOutboundHandlerAdapter,并且没有重写该方法,或者在重写的方法中调用了super.read(ctx); 那就会重复调用ChannelHandlerContext的read(),即AbstractChannelHandlerContext的read()方法。这样read(xxx)回调便会在ChannelHandlerContext的作用下从pipleline的ChannelOutboundHandler中的尾部传递到头部,直到DefaultChannelPipeline的DefaultChannelPipeline的HeadContext.

HeadContext的read(xxx)方法如下,HeadContext本身也是ChannelOutboundHandler

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

以NioChannel为例,unsafe.beginRead();最终会调用到AbstractNioChannel的doBeginRead()方法,其对应的源码如下:

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

该方法里就是Java Nio的相关操作,SelectionKey的性趣集中添加OP_READ,最终实现读数据。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69143.html

相关文章

  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)

    摘要:目录源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端源码分析之一揭开神秘的红盖头服务器 目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 ...

    zhaot 评论0 收藏0
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)

    摘要:目录源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端源码分析之一揭开神秘的红盖头服务器 目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 ...

    张金宝 评论0 收藏0
  • Netty4.x 源码实战系列(二):服务端bind流程详解

    摘要:对于,目前大家只知道是个线程组,其内部到底如何实现的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,这里不作详细介绍,后面会有文章作专门详解。 在上一篇《ServerBootstrap 与 Bootstrap 初探》中,我们已经初步的了解了ServerBootstrap是netty进行服务端开发的引导类。 且在上一篇的服务端示例中,我们也看到了,在使用netty进行网络编程时,我...

    laoLiueizo 评论0 收藏0
  • Netty channelRegisteredChannelActive---源码分析

    摘要:背景最近发现的回调方法,在连接创建成功和读取数据后都会被回调。那我也尝试着从源码找到答案吧。回调流程分析的回调流程和流程没有什么区别,可参考上文分析。但是在的方法中会调用这个是读数据的关键读数据分析读数据分析 背景 最近发现ChannelOutboundHandlerAdapter的read()回调方法,在连接创建成功和读取数据后都会被回调。因此就产生了疑问为什么建立连接和读取数据后r...

    wanghui 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<