摘要:但是它不是自己创建线程,而是从调用构造方法时指定的线程池中获取线程。这就意味着,即使发送两个独立的消息,操作系统会把他们视为一个字节串。释放过程很简单,调用它的方法,所有相关的和线程池将会自动关闭。
简单找了下发现网上没有关于Netty3比较完整的源码解析的文章,于是我就去读官方文档,为了加强记忆,翻译成了中文,有适当的简化。
原文档地址:Netty3文档
Chapter 1 开始 1、开始之前运行demo的前提有两个:最新版本的Netty3和JDK1.5以上
2、写一个Discard Server最简单的协议就是Discard协议——忽略所有接收到的数据并且不作任何响应。我们从Netty处理I/O事件的handler实现开始:
public class DiscardServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
DiscardServerHandler 继承SimpleChannelHandler——ChannelHandler的一个实现;
messageReceived方法接收MessageEvent类型的参数,它包含接收的客户端数据;
exceptionCaught方法在出现I/O错误或者处理事件时抛出错误时被调用,通常包含记录错误信息和关闭通道的动作;
接下来写一个main方法来开启使用DiscardServerHandler的服务:
public class DiscardServer { public static void main(String[] args) throws Exception { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new DiscardServerHandler()); } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8080)); } }
ChannelFactory是创建和管理Channel及其关联资源的工厂,它负责处理所有I/O请求并且执行I/O生成ChannelEvent。但是它不是自己创建I/O线程,而是从调用构造方法时指定的线程池中获取线程。服务端应用使用NioServerSocketChannelFactory;
ServerBootstrap是一个设置服务端的帮助类;
当服务端接收到一个新的连接,指定的ChannelPipelineFactory就会创建一个新的ChannelPipeline,这个新的Pipeline包含一个DiscardServerHandler对象;
你可以给Channel实现设置具体的参数,选项带"child."前缀代表应用在接收到的Channel上而不是服务端本身的ServerSocketChannel;
剩下的就是绑定端口启动服务,可以绑定多个不同的端口。
3、处理接收到的数据我们可以通过"telnet localhost 8080"命令去测试服务,但因为是Discard服务,我们都不知道服务是否正常工作。所以我们修改下服务,让它打印出接收到的数据。
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); while(buf.readable()) { System.out.println((char) buf.readByte()); System.out.flush(); } }
ChannelBuffer是Netty基本的存储字节的数据结构,跟NIO的ByteBuffer类似,但是更容易使用更灵活。比如Netty允许你在尽量少的内存复制次数的情况下把多个ChannelBuffer组合成一个。
4、写一个Echo服务一个服务通常对请求是有响应的。接下来我们尝试写一个实现Echo协议——将接收的数据原路返回给客户端的服务:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Channel ch = e.getChannel(); ch.write(e.getMessage()); }
MessageEvent继承了ChannnelEvent,一个ChannnelEvent持有它相关的Channel的引用。我们可以获取这个Channel然后调用写方法写入数据返回给客户端。
5、写一个时间服务这次我们实现一个时间协议——在不需要任何请求数据的情况下返回一个32位整型数字并且在发送之后关闭连接。因为我们忽略请求数据,只需要在连接建立的发送消息,所以这次不能使用messageReceived方法而是重写channelConnected方法:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { Channel ch = e.getChannel(); ChannelBuffer time = ChannelBuffers.buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ChannelFuture f = ch.write(time); f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { Channel ch = future.getChannel(); ch.close(); } }); }
channelConnected方法在连接建立的时间被调用,然后我们写入一个32位整型数字代表以秒为单位的当前时间;
我们使用ChannelBuffers工具类分配了一个容量为4字节的ChannelBuffer来存放这个32位整型数字;
然后我们把ChannelBuffer写入Channel...等一下,flip方法哪里去了?在NIO中我们不是要在写入通道前调用ByteBuffer的flip方法的吗?ChannelBuffer没有这个方法,因为它有两个指针,一个用于读操作一个用于写操作。当数据写入ChannelBuffer时写索引增加而读索引不变。读索引和写索引相互独立。对比之下,Netty的ChannelBuffer比NIO的buffer更容易使用。
另外需要注意的一点是ChannelBuffer的write方法返回的是一个ChannelFuture对象。它表示一个还未发生的I/O操作,因为Netty中所有操作都是异步的。所以我们必须在ChannelFuture收到操作完成的通知之后才能关闭Channel。哦,对了,close方法也是返回ChannelFuture...
那么问题来了,我们如何得到操作完成的通知呢?只需要简单得向返回的ChannelFuture对象中添加一个ChannelFutureListener,这里我们创建了一个ChannelFutureListener的匿名内部类,它在操作完成的时候会关闭Channel。
6、写一个时间客户端我们还需要一个遵守时间协议,即能把整型数字翻译成日期的客户端。Netty服务端和客户端唯一的区别就是要求不同的Bootstrap和ChannelFactory:
public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(new TimeClientHandler()); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.connect(new InetSocketAddress(host, port)); }
NioClientSocketChannelFactory,用来创建一个客户端Channel;
ClientBootstrap是ServerBootStrap在客户端的对应部分;
需要注意的是设置参数时不需要"child."前缀,客户端SocketChannel没有父Channel;
对应服务端的bind方法,这里我们需要调用connect方法。
另外我们需要一个ChannelHandler实现,负责把接收到服务端返回的32位整型数字翻译成日期并打印出来,然后断开连接:
public class TimeClientHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); e.getChannel().close(); } }
看上去很简单是吧?但是实际运行过程中这个handler有时会抛出一个IndexOutOfBoundsException。下一节我们会讨论为什么会这样。
7、处理基于流的传输 7.1、一个关于Socket Buffer的小警告在像TCP/IP那样基于流的传输中,接收数据保存在一个socket接收缓存中。但是这个缓存不是一个以包为单位的队列,而是一个以字节为单位的队列。这就意味着,即使发送两个独立的消息,操作系统会把他们视为一个字节串。因此,不能保证你读到的和另一端写入的一样。所以,不管是客户端还是服务端,对于接收到的数据都需要整理成符合应用程序逻辑的结构。
7.2、第一种解决方式回到前面的时间客户端的问题,32位整型数字很小,但是它也是可以拆分的,特别是当流量上升的时候,被拆分的可能性也随之上升。
一个简单的处理方式就是内部创建一个累计的缓存,直到接收满4个字节才进行处理。
private final ChannelBuffer buf = dynamicBuffer(); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer m = (ChannelBuffer) e.getMessage(); buf.writeBytes(m); if (buf.readableBytes() >= 4) { long currentTimeMillis = buf.readInt() * 1000L; System.out.println(new Date(currentTimeMillis)); e.getChannel().close(); } }
ChannelBuffers.dynamicBuffer()返回一个自动扩容的ChannelBuffer;
所有接收的数据都累积到这个动态缓存中;
handler需要检查缓存是否满4个字节,是的话才能继续业务逻辑;否则,Netty会在数据继续到达之后持续调用messageReceive。
7.3、第二种解决方案第一种方案有很多问题,比如一个复杂的协议,由多个可变长度的域组成,这种情况下第一种方案的handler就无法支持了。
你会发现你可以添加多个ChannelHandler到ChannelPipeline中,利用这个特性,你可以把一个臃肿的ChannelHandler拆分到多个模块化的ChannelHandler中,这样可以降低应用程序的复杂度。比如,你可以把TimeClientHandler拆分成两个handler:
TimeDecoder,负责分段问题;
最初那个简版的TimeClientHandler.
Netty提供了可扩展的类帮助你实现TimeDecoder:
public class TimeDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return buffer.readBytes(4); } }
FrameDecoder是ChannelHandler的一种实现,专门用来处理分段问题;
FrameDecoder在每次接收到新的数据时调用decode方法,携带一个内部维持的累积缓存;
如果返回null,说明目前数据接收的还不够,当数据量足够时FrameDecoder会再次调用方法;
如果返回非null对象,代表解码成功,FrameDecoder会丢弃累积缓存中剩余的数据。你无需提供批量解码,FrameDecoder会继续调用decode方法直到返回null。
拆分之后,我们需要修改TimeClient的ChannelPipelineFactory实现:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeDecoder(), new TimeClientHandler()); } });
Netty还提供了进一步简化解码的ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder{ @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, VoidEnum state) { return buffer.readBytes(4); } }
此外,Netty提供了一批开箱即用的解码器,让你可以简单得实现大多数协议:
org.jboss.netty.example.factorial 用于二进制协议;
org.jboss.netty.example.telnet 用于基于行的文本协议.
8、用POJO替代ChannelBuffer上面的demo我们都是用ChannelBuffer作为协议化消息的基本数据结构,这一节我们用POJO替代ChannelBuffer。将从ChannelBuffer提取信息的代码跟handler分离开,会使handler变得更加可维护的和可重用的。从上面的demo里不容易看出这个优势,但是实际应用中分离很有必要。
首先,我们定义一个类型UnixTime:
public class UnixTime { private final int value; public UnixTime(int value) { this.value = value; } public int getValue() { return value; } @Override public String toString() { return new Date(value * 1000L).toString(); } }
现在我们可以修改TimeDecoder让它返回一个UnixTime而不是ChannelBuffer:
@Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) { if (buffer.readableBytes() < 4) { return null; } return new UnixTime(buffer.readInt()); }
编码器改了,那么相应的TimeClientHandler就不会继续使用ChannelBuffer:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { UnixTime m = (UnixTime) e.getMessage(); System.out.println(m); e.getChannel().close(); }
同样的技术也可以应用到服务端的TimeServerHandler上:
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000)); ChannelFuture f = e.getChannel().write(time); f.addListener(ChannelFutureListener.CLOSE); }
能这样运用的前提是有一个编码器,可以把UnixTime对象翻译成ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler { public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) { UnixTime time = (UnixTime) e.getMessage(); ChannelBuffer buf = buffer(4); buf.writeInt(time.getValue()); Channels.write(ctx, e.getFuture(), buf); } }
一个编码器重写writeRequested方法拦截一个写请求。这里需要注意的一点是,尽管这里的writeRequested方法参数里也有一个MessageEvent对象,客户端TimeClientHandler的messageReceived的参数里也有一个,但是它们的解读是完全不同的。一个ChannelEvent可以是upstream也可以是downstream事件,这取决于事件的流向。messageReceived方法里的MessageEvent是一个upstream事件,而writeRequested方法里的是downstream事件。
当把POJO类转化为ChannelBuffer后,你需要把ChannelBuffer转发到之前在ChannelPipeline内的ChannelDownstreamHandler,也就是TimeServerHandler。Channels提供了多个帮助方法创建和发送ChanenlEvent。
同样,TimeEncoder也需要加入到服务端的ChannelPipeline中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline( new TimeServerHandler(), new TimeEncoder()); } });9、关闭你的应用程序
为了关闭I/O线程让应用程序优雅得退出,我们需要释放ChannelFactory分配的资源。
一个典型网络应用程序的关闭过程分为三步:
关闭所有服务端socket连接;
关闭所有非服务端socket连接(包括客户端socket和服务端接收到的socket);
释放ChannelFactory使用的所有资源。
应用到TimeClient上:
ChannelFuture future = bootstrap.connect(...); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
CilentBootStrap的connect方法返回一个ChannelFuture,当连接尝试成功或者失败时会通知到ChannelFuture。它还持有连接尝试关联的Channel的引用;
ChannelFuture.awaitUninterruptibly()等待ChannelFuture确定连接是否尝试成功;
如果连接失败,我们打印出失败的原因。ChannelFuture.getCause()会在连接即没有成功也没有取消的情况下返回失败的原因;
连接尝试的情况处理之后,我们还需要等待连接关闭。每个Channel有它自己的closeFuture,用来通知你连接关闭然后你可以针对关闭做一些动作。即使连接尝试失败了,closeFuture仍然会被通知,因为Channel会在连接失败后自动关闭;
所有连接关闭之后,剩下的就是释放ChannelFactory使用的资源了。释放过程很简单,调用它的releaseExternalResources方法,所有相关的NIO Selector和线程池将会自动关闭。
关闭一个客户端很简单,那服务端呢?你需要从端口解绑并关闭所有接收到的连接。前提是你需要一个保持跟踪活跃连接的数据结构,Netty提供了ChannelGroup。
ChannelGroup是Java集合API一个特殊的的扩展,它代表一组打开的Channel。如果一个Channel被添加到ChannelGroup,然后这个Channel被关闭了,它会从ChannelGroup中自动移除。你可以对同一ChannelGroup中的Channel做批量操作,比如在关闭服务的时候关闭所有Channel。
要跟踪打开的socket,你需要修改TimeServerHandler,把新打开的Channel添加到全局的ChannelGroup变量中。ChannelGroup是线程安全的。
@Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) { TimeServer.allChannels.add(e.getChannel()); }
现在我们自动维持了一个包含所有活跃Channel的列表,关闭服务端就像关闭客户端一样容易了。
public class TimeServer { static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) throws Exception { ... ChannelFactory factory = ...; ... ServerBootstrap bootstrap = ...; ... Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); waitForShutdownCommand(); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); factory.releaseExternalResources(); } }
DefaultChannelGroup构造方法接收组名为参数,组名是它的唯一标识;
ServerBootstrap的bind方法返回一个服务端的绑定指定本地地址的Channel,调用Channel的close方法将会使它与本地地址解绑;
所有Channel类型都可以被添加到ChannelGroup中,不管是客户端、服务端或是服务端接收的。因为你可以在服务器关闭时同时关闭绑定的Channel和接收到的Channel;
waitForShutdownCommand()是一个等待关闭信号的虚构方法。
我们可以对ChannelGroup中的Channel进行统一操作,这里我们调用close方法,相当于解绑服务端Channel并且异步关闭所有接收到的Channel。close方法返回一个功能和ChannelFuture相近的ChannelGroupFuture,在所有连接都成功关闭通知我们。
10、总结这一节我们快速浏览了Netty,示范了如何用Netty写一个能正常工作的网络应用。
下一节将介绍Netty的更多细节。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/74215.html
摘要:丰富的缓存数据结构使用它自己的缓存来表示字节序列而不是的。针对有一个定义良好的事件模型。有一些协议是多层的建立在其他低级协议基础上。此外,甚至不是完全线程安全的。协议由标准化为。协议缓存整合是一个高效二进制协议的快速实现。 Chapter 2、结构概览 这一节我们将确认Netty提供的核心功能是什么,以及它们怎么构成一个完整的网络应用开发堆栈。 1、丰富的缓存数据结构 Netty使用它...
摘要:的选择器允许单个线程监视多个输入通道。一旦执行的线程已经超过读取代码中的某个数据片段,该线程就不会在数据中向后移动通常不会。 1、引言 很多初涉网络编程的程序员,在研究Java NIO(即异步IO)和经典IO(也就是常说的阻塞式IO)的API时,很快就会发现一个问题:我什么时候应该使用经典IO,什么时候应该使用NIO? 在本文中,将尝试用简明扼要的文字,阐明Java NIO和经典IO之...
摘要:英文全名为,也叫远程过程调用,其实就是一个计算机通信协议,它是一种通过网络从远程计算机程序上请求服务而不需要了解底层网络技术的协议。 Hello,Dubbo 你好,dubbo,初次见面,我想和你交个朋友。 Dubbo你到底是什么? 先给出一套官方的说法:Apache Dubbo是一款高性能、轻量级基于Java的RPC开源框架。 那么什么是RPC? 文档地址:http://dubbo.a...
摘要:算法序和年的论文提出了一种定时轮的方式来管理和维护大量的调度算法内核中的定时器采用的就是这个方案。使用实例每一次的时间间隔每一次就会到达下一个槽位轮中的数源码解读之时间轮算法实现定时轮算法细说延时任务的处理定时器的实现 HashedWheelTimer算法 序 George Varghese 和 Tony Lauck 1996 年的论文:Hashed and Hierarchical ...
阅读 2666·2021-11-08 13:16
阅读 2297·2021-10-18 13:30
阅读 2213·2021-09-27 13:35
阅读 1971·2019-08-30 15:55
阅读 2422·2019-08-30 13:22
阅读 556·2019-08-30 11:24
阅读 2054·2019-08-29 12:33
阅读 1795·2019-08-26 12:10