资讯专栏INFORMATION COLUMN

Netty 4.x 官方入门指南 [译]

刘玉平 / 3388人阅读

摘要:目前为止,我们已经完成了一半的工作,剩下的就是在方法中启动服务器。第一个通常被称为,负责接收已到达的。这两个指针恰好标记着数据的起始终止位置。

前言

本篇翻译自netty官方Get Start教程,一方面能把好的文章分享给各位,另一方面能巩固所学的知识。若有错误和遗漏,欢迎各位指出。

https://netty.io/wiki/user-gu...

面临的问题

我们一般使用专用软件或者是开源库和其他系统通信。举个例子,我们通常使用 http 客户端从 web 服务器获取信息,或者通过 web service 执行一个 remote procedure call (远程调用)。然而,一个通用的协议时常不具备很好的扩展性,例如我们不会使用一个通用 http 服务器去做如下类型的数据交换——大文件,电子邮件,近实时的金融数据或者是游戏数据。因此,一个高度优化的致力于解决某些问题的通讯协议是很有必要的,例如你希望实现一台优化过的 http 服务器,致力于聊天应用,流媒体传输,大文件传输等。你甚至可以为已有需求量身定做一个全新的通信协议。另一个不可避免的情况是,你必须处理一个古老的专用协议,使用他去跟遗留系统通信,问题是我们该如何快速实现协议,同时不牺牲应用的稳定性和性能。

解决方法

The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance · high-scalability protocol servers and clients.

In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.

"Quick and easy" does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

Some users might already have found other network application framework that claims to have the same advantage, and you might want to ask what makes Netty so different from them. The answer is the philosophy it is built on. Netty is designed to give you the most comfortable experience both in terms of the API and the implementation from the day one. It is not something tangible but you will realize that this philosophy will make your life much easier as you read this guide and play with Netty.

Get Started

本章使用简单的例子带你浏览 netty 的核心构造,你快速上手。在本章过后,你就可以写出一个基于 netty 的客户端和服务器。如果你希望有更好的学习体验,你可以先浏览 Chapter 2, Architectural Overview 后再回来本章学习 (先看这里也是OK的)。

开始之前

能跑通本章例子的最低要求:最新版本的 netty(4.x) 和 JDK 1.6 或以上的版本。
在阅读时,当你对本章中出现的 class 感到疑惑,请查阅他们的 api 文档。而本章几乎所有的 class 都会链接到他们的 api 文档。如果你发现本章中有什么错误的信息、代码语法错误、或者有什么好的想法,也请联系 netty 社区通知我们。

写一个Discard Server(抛弃服务器)

世界上最简单的协议并不是 hello world,而是Discard。这种协议会抛弃掉所有接收到的数据,不会给客户端任何响应,所以实现Discard协议唯一要做的是忽略所有接收到的数据。接下来让我们着手写一个 handler,用来处理I/O events(I/O事件)。

package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

1.DiscardServerHandler 继承ChannelInboundHandlerAdapter,并间接实现了ChannelInboundHandlerChannelInboundHandler接口提供了多种 event handler method (事件处理方法),你可能要逐个实现接口中的方法,但直接继承ChannelInboundHandlerAdapter会是更好的选择。
2.这里我们重写了channelRead(),当有新数据到达时该方法就会被调用,并附带接收到的数据作为方法参数。在本例中,接收到的数据类型是ByteBuf
3.要实现 Discard 协议,这里 handler 会忽略接收到的数据。ByteBuf作为 reference-counted (引用计数) 对象,通过调用方法release()释放资源,请记住这个 release 动作在 handler 中完成 (原文:是handler的职责)。通常,我们会像下面那样实现channelRead()

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

4.当 netty 发生 I/O 错误,或者 handler 在处理 event (事件) 抛出异常时,exceptionCaught()就会被调用。大多数情况下我们应该记录下被捕获的异常,并关闭与之关联的channel(通道),但同时你也可以做一些额外的异常处理,例如在关闭连接之前,你可能会发送一条带有错误代码的响应消息。

目前为止,我们已经完成了一半的工作,剩下的就是在main()方法中启动Discard服务器。

package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}

NioEventLoopGroup是一个处理I/O操作的事件循环器 (其实是个线程池)。netty为不同类型的传输协议提供了多种NioEventLoopGroup的实现。在本例中我们要实现一个服务端应用,并使用了两个NioEventLoopGroup。第一个通常被称为boss,负责接收已到达的 connection。第二个被称作 worker,当 boss 接收到 connection 并把它注册到 worker 后,worker 就可以处理 connection 上的数据通信。要创建多少个线程,这些线程如何匹配到Channel上会随着EventLoopGroup实现的不同而改变,或者你可以通过构造器去配置他们。

ServerBootstrap是用来搭建 server 的协助类。你也可以直接使用Channel搭建 server,然而这样做步骤冗长,不是一个好的实践,大多数情况下建议使用ServerBootstrap

这里我们指定NioServerSocketChannel类,用来初始化一个新的Channel去接收到达的connection。

这里的 handler 会被用来处理新接收的ChannelChannelInitializer是一个特殊的 handler,帮助开发者配置Channel,而多数情况下你会配置Channel下的ChannelPipeline,往 pipeline 添加一些 handler (例如DiscardServerHandler) 从而实现你的应用逻辑。当你的应用变得复杂,你可能会向 pipeline 添加更多的 handler,并把这里的匿名类抽取出来作为一个多带带的类。

你可以给Channel配置特有的参数。这里我们写的是 TCP/IP 服务器,所以可以配置一些 socket 选项,例如 tcpNoDeply 和 keepAlive。请参考ChannelOptionChannelConfig文档来获取更多可用的 Channel 配置选项,并对此有个大概的了解。

注意到option()childOption()了吗?option()用来配置NioServerSocketChannel(负责接收到来的connection),而childOption()是用来配置被ServerChannel(这里是NioServerSocketChannel) 所接收的Channel

剩下的事情就是绑定端口并启动服务器,这里我们绑定到机器的8080端口。你可以多次调用bind()(基于不同的地址)。

刚刚,你使用 netty 完成了第一个服务端程序,可喜可贺!

处理接收到的数据

既然我们完成了第一个服务端程序,接下来要就要对它进行测试。最简单的方法是使用命令行 telnet,例如在命令行输入telnet localhost 8080,然后再输入点别的东西。
然而我们并不知道服务端是否真的在工作,因为他是 Discard Server,我们得不到任何响应。为了证明他真的在工作,我们让服务端打印接收到的数据。
我们知道当接收到数据时,channelRead()会被调用。所以让我们加点代码到 DiscardServerHandler 的channelRead()中:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}

这步低效的循环可以替换成System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))

你可以用in.release()替换这里的代码。

如果你再运行 telnet 命令,服务端会打印出接收到的数据。
Discard Server 完整的源码放在io.netty.example.discard这个包中。

写一个 Echo Server (打印服务器)

目前为止,我们写的服务程序消费了数据但没有给出任何响应,而作为一台服务器理应要对每一个请求作出响应。接下来让我们实现 ECHO 协议,学习如何响应消息并把接收到的数据发回客户端。
Echo Server 跟 Discard Server 唯一不同的地方在于,他把接收到的数据返回给客户端,而不是把他们打印到控制台。所以这里我们只需要修改channelRead()就行了:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}

ChannelHandlerContext能触发多种 I/O 事件和操作,这里我们调用write()方法逐字写回接收到的数据。请注意我们并没有释放接收到的消息Object msg,因为在写数据时ctx.write(msg),netty 已经帮你释放它了。

ctx.write()关没有把消息写到网络上,他在内部被缓存起来,你需要调用ctx.flush()把他刷新到网络上。ctx.writeAndFlush(msg)是个更简洁的方法。

如果再次使用命令行 telnet,你会看到服务端返回了你输入过的东西。完整的 Echo Server 源码放在io.netty.example.echo包下面。

写一个 Time Server (时间服务器)

我们这一小节要实现 TIME 协议。跟前面的例子不同,Timer Server 在连接建立时 (收到请求前) 就返回一个32位 (4字节) 整数,并在发送成功后关闭连接。在本例中,将会学习到如何构造和发送一个消息,在发送完成时关闭连接。
因为要在刚建立连接时发送消息而不管后来接收到的数据,这次我们不能使用channelRead(),取而代之的是channelActive方法,以下是具体实现:

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

当连接被建立后channelActive()方法会被调用,我们在方法体中发送一个32位的代表当前时间的整数。

要发送一个新的消息,需要分配一个新的buffer(缓冲区) 去包含这个消息。我们要写一个32位的整数,因此缓冲区ByteBuf的容量至少是4个字节。通过ChannelHandlerContext.alloc()获取ByteBufAllocator(字节缓冲区分配器),用他来分配一个新的buffer

像往常一样把消息写到网络上。
等一下Σ( ° △ °|||),flip()方法哪去了?还记不记得在NIO中曾经使用过的java.nio.ByteBuffer.flip()(简单总结就是把ByteBuffer从写模式变成读模式)?ByteBuf并没有这个方法,因为他包含了两个指针——读指针和写指针 (读写标记,不要理解成C里的指针)。当你往ByteBuf写数据时,写指针会移动而读指针不变。这两个指针恰好标记着数据的起始、终止位置。
与之相反,原生 NIO 并没有提供一个简洁的方式去标记数据的起始和终止位置,你必须要调用flip方法。有 时候你很可能忘记调用flip方法,导致发送不出数据或发送了错误数据。这样的错误并不会发生在 netty,因为 netty 有不同的指针去应对不同的操作 (读写操作),这使得编程更加简单,因为你不再需要 flipping out (疯狂输出原生 NIO)

其他需要注意的是ChannelHandlerContext.write()/writeAndFlush()方法返回了ChannelFutureChannelFuture表示一个还没发生的 I/O 操作。这意味着你请求的一些 I/O 操作可能还没被处理,因为 netty 中所有的操作都是异步的。举个例子,下面的代码可能在消息发送之前就关闭了连接:

   Channel ch = ...;
   ch.writeAndFlush(message);
   ch.close();

所以,你要在 (write()返回的)ChannelFuture完成之后再调用close()。当write操作完成后,ChannelFuture会通知到他的listeners(监听器)。需加注意,close()方法可能不会立即关闭链接,同样close()也会返回一个ChannelFuture

那么我们如何知道写操作完成了?很简单,只要向ChannelFuture注册监听器 (ChannelFutureListener) 就行。这一步,我们创建了ChannelFutureListener的匿名类,在写操作完成时关闭链接。
你也可以使用已经定义好的监听器,例如这样:

   f.addListener(ChannelFutureListener.CLOSE);

为了测试 Time server 是否如期工作,你可以使用 unix 的命令行:

$ rdate -o  -p 
写一个 Time Client (时间客户端)

跟 DISCARD 和 ECHO 服务器不同,我们要写一个客户端程序应对 TIME 协议,因为你无法把一个32位整数翻译成日期。本节中,我们确保服务端正常工作,并学习如何使用 netty 写一个客户端程序。
netty 客户端和服务器最大的不同在于,客户端使用了不同的BootstrapChannel实现类。请看下面的例子:

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

BootstrapServerBootstrap相似,但他是作用在客户端或无连接模式的 Channel (通道)。

如果你只定义了一个EventLoopGroup,他会同时作为 boss group 和 worker group,虽然客户端并没有 boss worker 这个概念。

这里使用NioSocketChannel而不是NioServerSocketChannelNioSocketChannel会被用来创建客户端Channel

ServerBootstrap不同,这里我们没有使用childOption(),因为客户端的SocketChannel没有父Channel

我们使用connect()代替bind()方法。

正如你所见,客户端代码跟服务端代码没有很大的区别。那么接下来就是实现ChannelHandler,他会从服务端接收一个32位整数,翻译成可读的日期格式并打印出来,最后关闭连接:

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

处理TCP/IP时, netty 把读取的数据放到ByteBuf

看起来非常简单,跟服务端没有多大区别。然而有时候 handler 会发生错误,例如抛出异常IndexOutOfBoundsException,在下一章节我们会作具体讨论。

基于流的传输协议 关于Socket缓冲区的小警告

TCP/IP这种基于流的传输协议,接收的数据会存储到socket缓冲区。不幸的是,这类缓冲区不是数据包队列,而是字节流队列。这意味着,即使你想发送两个消息并打包成两个数据包,操作系统只会把他们当作一连串字节。因此,这不能保证你读到的数据恰好是远程发送端写出的数据。举个例子,假设操作系统TCP/IP栈收到三个数据包:

因为这种流式协议的特性,应用程序很有可能像下图的方式那样读取数据碎片:

所以,作为接收端 (不管是服务端还是客户端),应把接收到的数据 (字节流) 整理成一个或多个易于理解的数据贞。对于上述的例子,整理如下:

解决方案一

让我们回到 TIME Client 这个例子。一32位整数的数据量非常小,在本例中不应用被分割。然而,问题在于他确实有可能被分割,可能性随着通信数据量的增大而增大。
一个简单的方法是创建一个内部的cumulative buffer(累积缓冲区),等待数据直到接收到4个字节为止。下面是修改过的TimeClientHandler

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ChannelHandler有两个与生命周期有关的监听方法:handlerAdded()handlerRemove()。你可以在里面执行任意的初始化或析构任务,只要他们不会阻塞程序很长时间。

首先,所有接收的数据被累积到缓冲区。

其次,handler 检查缓冲区buf是否接收到足够的数据 (4个字节),若是,则进行实际业务处理。否则当有更多数据到达时,netty 会再次调用channelRead(),直到缓冲区累积到4个字节。

解决方案二

虽然方案一解决了问题,但修改过的 handler 看上去不是那么简洁。想像一下协议变得更为复杂,例如包含多个可变长字段,你的ChannelInboundHandler很快会变得不可维护。
你可能会注意到,可以向ChannelPipeline添加多个ChannelHandler。所以,你可以把一个庞大复杂的ChannelHandler分割成多个小模块,从而减小应用的复杂性。举个例子,你可以把TimeClientHandler分割成两个handler:

处理数据碎片的TimeDecoder

最初始的简单版本的TimeClientHandler

幸运的是,netty 提供了一个可扩展的父类,帮助你书写TimeDecoder

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}

ByteToMessageDecoder实现了ChannelInboundHandler,使你更容易去处理数据碎片。

当有新数据到达时,ByteToMessageDecoder调用decode()方法并维护了一个内部的cumulative buffer(累积缓冲区)

累积缓冲区数据不足时,decode()方法不会添加任何东西到 out 列表。当有更多数据到达时,ByteToMessageDecoder会再次调用decode()方法。

如果decode()方法向 out 列表添加了一个对象,这表示decoder(解码器) 成功解析了一个消息。ByteToMessageDecoder会抛弃掉cumulative buffer(累积缓冲区)中已读数据。请记住,你不需要去解析多个消息,因为ByteToMessageDecoder会持续调用decode(),直到他没有往 out 列表添加对象。

既然希望往ChannelPipeline添加其他 handler (上面的TimeDecoder),我们要修改TimeClient中的ChannelInitializer

b.handler(new ChannelInitializer() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果你充满冒险精神,你可以尝试使用ReplayingDecoder,他会使代码更加简洁:

public class TimeDecoder extends ReplayingDecoder {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List out) {
        out.add(in.readBytes(4));
    }
}

此外,netty 提供了很多开箱即用的decoder,他们已经实现了大多数的网络协议,避免你自己去实现一个庞大的难以维护的handler。请参考下面的包获取更多详细例子:

io.netty.example.factorial 二进制协议

io.netty.example.telnet 文本协议

使用 POJO 代替 ByteBuf

上面所有例子都使用了ByteBuf作为协议中基本的数据结构。在本小节,我们将要升级 TIME 协议中的客户端和服务端,使用 POJO 代替ByteBuf
使用 POJO 的优势是显而易见的:你的 handler 变得易于维护和可重用,通过把 (从ByteBuf中抽取信息的) 代码分离出来。在 TIME 协议的例子里,我们仅仅读取一个32位的整数,直接使用ByteBuf并不会有太大问题。然而,当实现一个真实的网络协议时,你会发现做代码分离很有必要。

首先,让我们定义一个新的类型UnixTime

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

现在修改TimeDecoder,让他向out列表添加一个UnixTime而不是ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

既然修改了TimeDecoderTimeClientHandler也不能再使用ByteBuf了:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

是不是更加简单、优雅?相同的窍门同样能使用在服务端。这次让我们先修改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

现在只剩下encoder(编码器),他需要实现ChannelOutboundHandler,把UnixTime翻译回ByteBuf。这里比书写decoder更加简单,因为我们不再需要处理数据包碎片并把他们组装起来了。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

这一行代码有几个重要的点:
首先,我们把参数中ChannelPromise传递到write(),以便 netty 把他标记为成功或失败 (当数据真正写到网络时)。
其次,我们没有调用ctx.flush(),因为ChannelOutboundHandlerAdapter中有一个多带带的方法void flush(ChannelHandlerContext ctx)专门用来处理flush操作。

你可以使用MessageToByteEncoder更加地简化代码:

public class TimeEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

最后一步就是把TimeEncoder添加到服务端ChannelPipeline,留作练习。

关闭你的应用

关闭netty应用很简单——通过shutdownGracefully()去关闭所有创建的EventLoopGroup。他返回一个Future去通知你什么时候EventLoopGroup和他从属的 Channel 已经完全关闭了。

总结

本章,我们快速浏览了 netty,使用他书写了一个可用的网络应用。接下来的章节中会介绍更多关于 netty 的详细资料,我们也希望你去重温io.netty.example package包中的例子。netty 社区的大门会向你敞开,你可以向社区提出问题和意见,您的的反馈会帮助netty项目变得更加完善。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77054.html

相关文章

  • 少啰嗦!一分钟带你读懂Java的NIO和经典IO的区别

    摘要:的选择器允许单个线程监视多个输入通道。一旦执行的线程已经超过读取代码中的某个数据片段,该线程就不会在数据中向后移动通常不会。 1、引言 很多初涉网络编程的程序员,在研究Java NIO(即异步IO)和经典IO(也就是常说的阻塞式IO)的API时,很快就会发现一个问题:我什么时候应该使用经典IO,什么时候应该使用NIO? 在本文中,将尝试用简明扼要的文字,阐明Java NIO和经典IO之...

    Meils 评论0 收藏0
  • Spring Boot 2 快速教程:WebFlux 快速入门(二)

    摘要:响应式编程是基于异步和事件驱动的非阻塞程序,只是垂直通过在内启动少量线程扩展,而不是水平通过集群扩展。三特性常用的生产的特性如下响应式编程模型适用性内嵌容器组件还有对日志消息测试及扩展等支持。 摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 02:WebFlux 快速入门实践 文章工程: JDK...

    gaara 评论0 收藏0
  • 网络编程 - 收藏集 - 掘金

    摘要:个高级多线程面试题及回答后端掘金在任何面试当中多线程和并发方面的问题都是必不可少的一部分。目前在生产环基于的技术问答网站系统实现后端掘金这一篇博客将详细介绍一个基于的问答网站的实现,有详细的代码。 15 个高级 Java 多线程面试题及回答 - 后端 - 掘金在任何Java面试当中多线程和并发方面的问题都是必不可少的一部分。如果你想获得任何股票投资银行的前台资讯职位,那么你应该准备很多...

    justCoding 评论0 收藏0
  • 网络编程 - 收藏集 - 掘金

    摘要:个高级多线程面试题及回答后端掘金在任何面试当中多线程和并发方面的问题都是必不可少的一部分。目前在生产环基于的技术问答网站系统实现后端掘金这一篇博客将详细介绍一个基于的问答网站的实现,有详细的代码。 15 个高级 Java 多线程面试题及回答 - 后端 - 掘金在任何Java面试当中多线程和并发方面的问题都是必不可少的一部分。如果你想获得任何股票投资银行的前台资讯职位,那么你应该准备很多...

    selfimpr 评论0 收藏0
  • 基于 Netty 的自定义帧高可靠性读取方案

    摘要:完成客户端服务器通信,需要基于协议之上,自定义一套简单的通信协议,其中数据交换方式需要使用自定义帧。输入数据处理器以下为输入数据的第一个处理器,可以保证无论帧经历怎样的粘包拆包,均可以准确提取每一个自定义帧的数据部分。 「博客搬家」 原地址: 简书 原发表时间: 2017-03-26 本文采用 Netty 这一最流行的 Java NIO 框架,作为 Java 服务器通信部分的基础...

    Berwin 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<