资讯专栏INFORMATION COLUMN

Netty channelRegisteredChannelActive---源码分析

wanghui / 660人阅读

摘要:背景最近发现的回调方法,在连接创建成功和读取数据后都会被回调。那我也尝试着从源码找到答案吧。回调流程分析的回调流程和流程没有什么区别,可参考上文分析。但是在的方法中会调用这个是读数据的关键读数据分析读数据分析

背景

最近发现ChannelOutboundHandlerAdapter的read()回调方法,在连接创建成功和读取数据后都会被回调。因此就产生了疑问“为什么建立连接和读取数据后read()方法会被调用呢?” 从网上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具体流程和一些疑问还是没有解开。
那我也尝试着从源码找到答案吧。

Demo演示

我们先写个小Demo,其中Test1OutboundHandlerAdapter是一个ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一个ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很简单,下面只贴部分代码

Test1OutboundHandlerAdapter.java

@Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        super.read(ctx);
        System.out.println("Test1OutboundHandlerAdapter------------->read");
    }

Test1HandlerAdapter.java

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        System.out.println("Test1HandlerAdapter-------------->channelRegistered");
    }
   
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("Test1HandlerAdapter-------------->channelActive");
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Test1HandlerAdapter-------------->channelRead");
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }    

然后我们建立连接,随便发一下数据,服务器收到数据,打印如下:

Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read

Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read

如果把Test1OutboundHandlerAdapter的read(xxx)回调方法注释掉,会发现服务器无法接收数据了。

源码分析 1.channelRegistered回调流程分析

可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法调用了channelRegistered(xxx)方法,然后再查找会发现是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()

AbstractChannelHandlerContext

@Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                /**ChannelInboundHandler的register(xxx)在这里被调用*/
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

顺藤fireChannelRegistered()摸瓜,最终定位到AbstractChannel内部类AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)

AbstractUnsafe

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

在继续找的会找到在EventLooop层面的调用了,我们可以先不用管了。在register0的方法中又调用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,这正是我们要找的,也符合打印顺序先channelRegistered后channelActive。为了验证我们可以加上断点调试,就是这儿了。

至此我们可以总结一下:
channelRegistered流程:

说明

DefaultChannelPipeline 的fireChannelRegistered()

    @Override
    public final ChannelPipeline fireChannelUnregistered() {
        AbstractChannelHandlerContext.invokeChannelUnregistered(head);
        return this;
    }

AbstractChannelHandlerContext.invokeChannelUnregistered(head);传递的参数是DefaultChannelPipeline的head,这样保证了register事件沿着pipeline从头流向尾,其对应DefaultChannelPipeline内部类HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler

DefaultChannelPipeline

 final AbstractChannelHandlerContext head;
 final AbstractChannelHandlerContext tail;
 
...省略...

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

DefaultChannelPipeline的内部类HeadContext

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }
省略后边的代码

2.上图黄色的部分都是调用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的参数是DefaultChannelPipeline传递的head即HeadContext,那么也就是head.invokeChannelRegistered()
invokeChannelRegistered()方法中会调用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext类中该方法返回的就是自己(可查看上面的代码),因为HeadContext本身也是ChannelInboundHandler。 同时又将自己作为参数,调用自己的channelRegistered方法

3.HeadContext的ChannelRegister方法中调用AbstractChannelHandlerContext的fireChannelRegistered();
(还是调用的自己)该方法中调用了invokeChannelRegistered(findContextInbound()); findContextInbound()所实现的功能就是查找到下一个ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一个ChanelInboundHandler
上面的步骤不断重复,自此registered事件可以沿着pipeline在不同的InboundHandler里流动了。

2.channelActive回调流程分析

channelActive的回调流程和channelRegister流程没有什么区别,可参考上文分析。 但是在HeadContext的channelActive方法中会调用readIfIsAutoRead(); 这个是读数据的关键

3.netty读数据分析

读数据分析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69148.html

相关文章

  • Netty AUTO_READ读数据流程分析

    摘要:用这种方式很好的规避了多线程所带来的问题,很值得我们借鉴那么这个怎么来的呢看一下的方法如果为,就返回。 Netty channelRegisteredChannelActive---源码分析经过下面的分析我们可以了解netty读数据的一个过程,以及为什么channelActive方法、channelReadComplete方法会回调ChannelOutboundHandler的read...

    fevin 评论0 收藏0
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程

    摘要:背景在工作中虽然我经常使用到库但是很多时候对的一些概念还是处于知其然不知其所以然的状态因此就萌生了学习源码的想法刚开始看源码的时候自然是比较痛苦的主要原因有两个第一网上没有找到让我满意的详尽的源码分析的教程第二我也是第一次系统地学习这么大代 背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学...

    shenhualong 评论0 收藏0
  • Netty 源码分析之 零 磨刀不误砍柴工 源码分析环境搭建

    摘要:目录此文章属于源码之下无秘密做最好的源码分析教程系列文章之一代码下载首先到的仓库中点击右边绿色的按钮拷贝地址然后在终端中输入如下命令克隆工程工程源码较大加上国内网络问题下载源码可能会比较耗时当有如下输出时表示克隆成功了如果有朋友实在下载太 目录 此文章属于 源码之下无秘密 ── 做最好的 Netty 源码分析教程 系列文章之一. 代码下载 首先到 Netty 的 Github 仓库 中...

    freewolf 评论0 收藏0
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)

    摘要:目录源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端源码分析之一揭开神秘的红盖头服务器 目录 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 简介 Java NIO 的前生今世 ...

    zhaot 评论0 收藏0
  • Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)

    摘要:目录源码之下无秘密做最好的源码分析教程源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端 目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NI...

    livem 评论0 收藏0

发表评论

0条评论

wanghui

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<