资讯专栏INFORMATION COLUMN

ChannelPipeline 和 ChannelHandler

William_Sang / 3061人阅读

摘要:概念与概念一致用以连接设备文件等的纽带例如将网络的读写客户端发起连接主动关闭连接链路关闭获取通信双方的网络地址等的类型主要有两种非阻塞以及阻塞数据传输类型有两种按事件消息传递以及按字节传递适用方类型也有两种服务器以及客户端还有一些根据传输协

ChannelHandler Channel

Channel 概念与 java.nio.channel 概念一致, 用以连接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.

Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).

数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).

适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.

Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel, 再以此根据IO类型、数据传输类型、适用方类型实现. 类图可以一目了然, 如下图所示:

Channel 状态

channelRegistered 状态

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

从注释里面可以看到是在 Channel 绑定到 Eventloop 上面的时候调用的.

不管是 Server 还是 Client, 绑定到 Eventloop 的时候, 最终都是调用 Abstract.initAndRegister() 这个方法上(Server是在 AbstractBootstrap.doBind() 的时候调用的, Client 是在 Bootstrap.doConnect() 的时候调用的).

initAndRegister() 方法定义如下:

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 把channel绑定到Eventloop对象上面去
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 做实际的绑定动作。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实现在Abstract.doRegister()方法内
                doRegister();
                neverRegistered = false;
                registered = true;

                // 通过pipeline的传播机制,触发handlerAdded事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                // 通过pipeline的传播机制,触发channelRegistered事件
                pipeline.fireChannelRegistered();

                // 还没有绑定,所以这里的 isActive() 返回false.
                if (isActive()) {
                    if (firstRegistration) {
                        // 如果当前链路已经激活,则调用channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

从上面的代码也可以看出, 在调用完 pipeline.fireChannelRegistered() 之后, 紧接着会调用 isActive() 判断当前链路是否激活, 如果激活了则会调用 pipeline.fireChannelActive() 方法.

这个时候, 对于 Client 和 Server 都还没有激活, 所以, 这个时候不管是 Server 还是 Client 都不会调用 pipeline.fireChanenlActive() 方法.

channelActive 状态

从启动器的 bind() 接口开始, 往下调用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 调用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

doBind 方法又会调用 doBind0() 方法, 在 doBind0() 方法中会通过 EventLoop 去执行 channelbind()任务.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 调用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0() 方法往下会调用到 pipeline.bind(localAddress, promise); 方法, 通过 pipeline 的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法, 这个方法主要做两件事情:

调用 doBind(): 调用底层JDK API进行 Channel 的端口绑定.

调用 pipeline.fireChannelActive().

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在绑定成功前为 false
    boolean wasActive = isActive();
    try {
        // 调用doBind()调用JDK底层API进行端口绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 完成绑定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

也就是说当有新客户端连接的时候, 会变成活动状态.

channelInactive 状态

fireChannelnactive() 方法在两个地方会被调用: Channel.close()Channel.disconnect().

在调用前会先确认状态是从 Active--->Inactive.

channelUnregistered 状态

fireChannelUnregistered() 方法是在 ChannelEventloop 中解除注册的时候被调用的. Channel.close() 的时候被触发执行.

ChannelHandler 的生命周期

handlerAdded(): 添加到 ChannelPipeline 时调用.
handlerRemoved(): 从 ChannelPipeline 中移除时调用.
exceptionCaught(): 处理过程中在 ChannelPipeline 中有错误产生时调用.

处理 I/O 事件或截获 I/O 操作, 并将其转发到 ChannelPipeline 中的下一个处理程序. ChannelHandler 本身不提供许多方法, 但通常必须实现其子类型之一:

ChannelInboundHandler: 处理入站数据以及各种状态变化.

ChannelOutboundHandler: 处理出站数据并且允许拦截所有的操作.

ChannelInboundHandler 接口

channelRegistered(): 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用.
channelUnregistered(): 当 Channel 从他的 EventLoop 注销并且无法处理任何 I/O 时被调用.
channelActive(): 当 Channel 处于活动状态时被调用.
channelInactive(): 当 Channel 离开活动状态并且不再连接远程节点时被调用.
channelRead(): 当从 Channel 读取数据时被调用.
channelReadComplete(): 当 Channel 上的一个读操作完成时被调用. 当所有可读字节都从 Channel 中读取之后, 将会调用该回调方法.

ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel ChannelPipeline 以及 ChannelHandlerContext 调用.

ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或事件, 这使得可以通过一些复杂的方法来处理请求. 例如, 如果到远程节点的写入被暂停, 那么你可以推迟刷新操作并在稍后继续.

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise): 当请求将 Channel 连接到远程节点时被调用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise): 当请求将 Channel 从远程节点断开时被调用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise): 当请求将 Channel 从它的 EventLoop 注销时被调用.
read(ChannelHandlerContext ctx): 当请求从 Channel 读取更多的数据时被调用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise): 当请求通过 Channel 将数据写到远程节点时被调用.
flush(ChannelHandlerContext ctx): 当请求从 Channel 将入队数据冲刷到远程节点时被调用.

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中异步I/O操作的结果, 在 netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回, 可以通过 ChannelFuture 来获取 I/O 操作的结果或者状态信息.

当 I/O 操作开始时, 将创建一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 因为 I/O 操作还没有完成.

如果 I/O 操作已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障原因.

请注意, 即使失败和取消属于已完成状态.

ChannelPromiseChannelFuture 的一个子接口, 其定义了一些可写的方法, 如 setSuccess()setFailure(), 从而使 ChannelFuture 不可变.

优先使用addListener(GenericFutureListener),而非await()

当做了一个 I/O 操作并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener) 的方式来获得通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它会把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 线程会在 I/O 操作相关的 future 完成的时候通知监听器.

ChannelFutureListener 会利于最佳的性能和资源的利用, 因为它一点阻塞都没有. 而且不会造成死锁.

ChannelHandler 适配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 这两个适配器类分别提供了
ChannelInboundHandlerChannelOutboundHandler 的基本实现, 它们继承了共同的父接口
ChannelHandler 的方法, 扩展了抽象类 ChannelHandlerAdapter.

ChannelHandlerAdapter 还提供了实用方法 isSharable().

如果其对应的实现被标注为 Sharable, 那么这个方法将返回 true, 表示它可以被添加到多个 ChannelPipeline 中.

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 中所提供的方法体调用了其相关联的 ChannelHandlerContext 上的等效方法, 从而将事件转发到了 ChannelPipeline 中的 ChannelHandler 中.

ChannelPipeline 接口

ChannelPipeline 将多个 ChannelHandler 链接在一起来让事件在其中传播处理 (通过扩展
ChannelInitializer). 一个 ChannelPipeline 中可能不仅有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.

每一个新创建的 Channel 都将会分配一个新的 ChannelPipeline, 不能附加另一个 ChannelPipeline, 也不能分离当前的.

通过调用 ChannelHandlerContext 实现, 它将被转发给同一个超类型的下一个 ChannelHandler.

从事件途径 ChannelPilpeline 的角度来看, ChannelPipeline 的头部和尾端取决于该事件是入站的还是出站的.

而 Netty 总是将 ChannelPilpeline 的入站口 (左侧) 作为头部, 将出站口 (右侧) 作为尾端.

当通过调用 ChannelPilpeline.add*() 方法将入站处理器和出站处理器混合添加到 ChannelPilpeline 之后, 每一个 ChannelHandler 从头部到尾端的顺序就是我们添加的顺序.

ChannelPilpeline 传播事件时, 它会测试 ChannelPilpeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配. 如果不匹配, ChannelPilpeline 将跳过该 ChannelHandler 并前进到下一个, 直到它找到和该事件期望的方向相匹配的为止.

修改 ChannelPipeline

这里指修改 ChannelPipeline 中的 ChannelHandler 的编排.

通过调用 ChannelPipeline 上的相关方法, ChannelHandler 可以添加, 删除或者替换其他的 ChannelHandler, 从而实时地修改 ChannelPipeline 的布局.

addFirst  // 将 ChannelHandler 插入第一个位置
addBefore // 在某个 ChannelHandler 之前添加一个
addAfter  // 在某个 ChannelHandler 之后添加一个
addLast   // 将 ChannelHandler 插入最后一个位置
remove    // 移除某个 ChannelHandler
replace   // 将某个 ChannelHandler 替换成指定 ChannelHandler
ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChanelHandlerChannelPipeline 之间的关联, 每当有 ChanelHandler 添加到 ChannelPipeline 中, 都会创建 ChannelHandlerContext.

ChannelHandlerContext 的主要功能是管理它所关联的 ChannelPipeline 和同一个 ChannelPipeline 中的其他 ChanelHandler 之间的交互.

ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 ChannelChannelPipeline 上, 但是有一点重要的不同.

如果调用 ChannelChannelPipeline 上的这些方法将沿着 ChannelPipeline 进行传播(从头或尾开始).
而调用位于 ChannelHandlerContext 上的相同方法, 则将从当前所关联的 ChannelHandler 开始, 并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler.

这样做可以减少 ChannelHandler 的调用开销.

使用 ChannelHandlerContext

上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74384.html

相关文章

  • Netty-ChannelHandler-ChannelPipeline

    摘要:只有在详尽的测试之后才应设置为这值使用的默认采样率检测并报告任何发现的泄漏。这是默认级别,适合绝大部分情况使用默认的采样率,报告所发现的任何的泄漏以及对应的消息被访问的位置类似于但是其将会对每次对消息的访问都进行采样。 ChannelHandler Channel生命周期 状态 描述 ChannelUnregistered Channel已经被创建,但未注册到EventLoo...

    warkiz 评论0 收藏0
  • Netty组件入门学习

    摘要:可以用来接收入站事件和数据,随后使用应用程序的业务逻辑进行处理。因为用户并不是关心所有的事件,因此提供了抽象类和。抽象类最常见的一个情况,你的应用程序会利用一个来接受解码消息,并对该数据应用业务逻辑。 Channel、EventLoop和ChannelFuture Channel——Socket; EventLoop——控制流、多线程处理、并发 ChannelFuture异步通知 ...

    qpal 评论0 收藏0
  • Netty 框架总结「ChannelHandler 及 EventLoop」

    摘要:随着状态发生变化,相应的产生。这些被转发到中的来采取相应的操作。当收到数据或相关的状态改变时,这些方法被调用,这些方法和的生命周期密切相关。主要由一系列组成的。采用的线程模型,在同一个线程的中处理所有发生的事。 「博客搬家」 原地址: 简书 原发表时间: 2017-05-05 学习了一段时间的 Netty,将重点与学习心得总结如下,本文主要总结ChannelHandler 及 E...

    VioletJack 评论0 收藏0
  • Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (一)

    摘要:目录源码之下无秘密做最好的源码分析教程源码分析之番外篇的前生今世的前生今世之一简介的前生今世之二小结的前生今世之三详解的前生今世之四详解源码分析之零磨刀不误砍柴工源码分析环境搭建源码分析之一揭开神秘的红盖头源码分析之一揭开神秘的红盖头客户端 目录 源码之下无秘密 ── 做最好的 Netty 源码分析教程 Netty 源码分析之 番外篇 Java NIO 的前生今世 Java NI...

    tunny 评论0 收藏0
  • Netty学习笔记(二)

    摘要:支持很多协议,并且提供用于数据处理的容器。我们已经知道由特定事件触发。可专用于几乎所有的动作,包括将一个对象转为字节或相反,执行过程中抛出的异常处理。提供了一个容器给链并提供了一个用于管理沿着链入站和出站事件的流动。子类通过进行注册。 前两天写了一点netty相关的知识,并写了一个demo,但是对其原理还是没有深入,今天我们来做一次研究吧 首先让我们来认识一下netty的几个核心人物吧...

    0x584a 评论0 收藏0

发表评论

0条评论

William_Sang

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<