资讯专栏INFORMATION COLUMN

netty

cfanr / 3010人阅读

摘要:设置每个数据包的大小如个字节,如果某个数据包不足个字节可能会出现丢包的情况,即该数据包未从一个端到另一个端,此时需要用空格或者既定的符号补充在数据包之间使用一些字符进行分割如号之类的,解析的时候先处理掉分隔符再拿到各个数据包就好了。

netty

概念: Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

新特性

处理大容量数据流更简单

处理协议编码和单元测试更简单

I/O超时和idle状态检测

应用程序的关闭更简单,更安全

更可靠的OutOfMemoryError预防

性能

更好的吞吐量,更低的延迟

更少的资源消耗

最小化不必要的内存拷贝

具体使用见代码及注释

Helloword版
服务端这边绑定了两个端口,可以根据业务区别对待如端口1是做A业务,端2做B业务.

public class Server {
    public static void main(String[] args) throws InterruptedException {
        //1.创建两个线程组 (只有服务器端需要 )
        //一个线程组专门用来管理接收客户端的请求连接的
        //一个线程组进行网络通信(读写)
        EventLoopGroup receiveGroup = new NioEventLoopGroup();
        EventLoopGroup dealGroup = new NioEventLoopGroup();
        //创建辅助工具类,用于设置服务器通道的一系列配置
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(receiveGroup, dealGroup)//绑定两个线程组
        .channel(NioServerSocketChannel.class)   //指定NIO的模式
        .option(ChannelOption.SO_BACKLOG, 1024)     //设置tcp缓冲区
        .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲区大小
        .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小
        .option(ChannelOption.SO_KEEPALIVE, true)  //保持连接
        .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //3 在这里配置具体数据接收方法的处理
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        //4 进行绑定 
                ChannelFuture cf1 = serverBootstrap.bind(8765).sync();
                ChannelFuture cf2 = serverBootstrap.bind(8764).sync();
                //5 等待关闭
                cf1.channel().closeFuture().sync();
                cf2.channel().closeFuture().sync();
                receiveGroup.shutdownGracefully();
                dealGroup.shutdownGracefully();
     }
}

服务端处理器:

public class ServerHandler extends ChannelHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channel active... ");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "gbk");
            System.out.println("Server :" + body );
            String response = "进行返回给客户端的响应:" + body ;
            //注意使用了writeAndFlush的话就可以不释放ReferenceCountUtil.release(msg); 否则需要释放ByteBuf容器的数据。
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
            //.addListener(ChannelFutureListener.CLOSE);//监听,内容传输完毕后就关闭管道
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("读完了");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
            throws Exception {
        ctx.close();
    }


}

客户端:

public class Client {
public static void main(String[] args) throws Exception{
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
        ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
        //发送消息
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes()));
        Thread.sleep(1000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes()));
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes()));
        Thread.sleep(2000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes()));
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes()));
        
        cf1.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();
        group.shutdownGracefully();
        
        
        
    }
}

客户端处理器:

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端的channelActive()方法");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            
            String body = new String(req, "gbk");
            System.out.println("Client :" + body );
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    
TCP拆包粘包问题

TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
通俗意义来说可能是三个数据如"A","B","C" 但经过TCP协议流式传输后成了"AB","C"两个数据了,这种就是粘包了数据包之间粘一起了。那么拆包的话有三种方式。

设置每个数据包的大小如200个字节,如果某个数据包不足200个字节可能会出现丢包的情况,即该数据包未从一个端到另一个端,此时需要用空格或者既定的符号补充.

在数据包之间使用一些字符进行分割如$号之类的,解析的时候先处理掉分隔符再拿到各个数据包就好了。(一般用的比较多)

细粒化数据包分为头和尾(将消息分为消息头和消息尾)

其他

两根水管(服务器与客户端)需要相互流通水(数据),那么需要一个转接头(套接字)连接,水流式无法区分一段段的数据,一种方式在流通的过程中设置些标志性物品如记号笔勾一下(分隔符),另一种方式则是设定每一段都是多少容量的水来区分.

使用分隔符解决TCP粘包

可以理解管道流里流的都是ByteBuffer类型的数据,那么使用分隔符(非ByteBuffer类型)的话可能就意味着一个转码与解码的过程。
服务端:

public class Server {
    public static void main(String[] args) throws Exception{
        //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        //2 创建服务器辅助类
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //设置特殊分隔符  解决TCP拆包黏包问题,
                ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //设置字符串形式的解码
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        //4 绑定连接
        ChannelFuture cf = b.bind(8765).sync();
        
        //等待服务器监听端口关闭
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
        
    }
}

服务端处理器:

public class ServerHandler extends ChannelHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server channelRead:" + request);
        String response = "服务器响应:" + msg + "$";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        System.out.println("exceptionCaught");
        ctx.close();
    }

}

客户端:

public class Client {
public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //
                ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据A$".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("数据B$".getBytes()));
        
        
        //等待客户端端口关闭
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}

客户端处理器:

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
        ctx.close();
    }

}
设置长度大小解决TCP拆包黏包问题

服务端:

public class Server {

    public static void main(String[] args) throws Exception{
        //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        //2 创建服务器辅助类
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //设置定长字符串接收
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                //设置字符串形式的解码
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        //4 绑定连接
        ChannelFuture cf = b.bind(8765).sync();
        
        //等待服务器监听端口关闭
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
        
    }
    
}

客户端:

public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));
        
        //等待客户端端口关闭
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}

服务端与客户端的处理器参照上例以字符串分割的.

                                新手上路,多多关注...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68178.html

相关文章

  • netty搭建web聊天室(1)

    摘要:提供异步的事件驱动的网络应用程序框架和工具,用以快速开发高性能高可靠性的网络服务器和客户端程序。总结我们完成了服务端的简单搭建,模拟了聊天会话场景。 之前一直在搞前端的东西,都快忘了自己是个java开发。其实还有好多java方面的东西没搞过,突然了解到netty,觉得有必要学一学。 介绍 Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框...

    izhuhaodev 评论0 收藏0
  • 慕课网_《Netty入门之WebSocket初体验》学习总结

    时间:2018年04月11日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:https://github.com/zccodere/s... 学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程介绍 什么是Netty 高性能、事件驱动、异步非阻塞的IO Java开源框架 基于NIO的客户...

    Noodles 评论0 收藏0
  • Netty 源码分析之 零 磨刀不误砍柴工 源码分析环境搭建

    摘要:目录此文章属于源码之下无秘密做最好的源码分析教程系列文章之一代码下载首先到的仓库中点击右边绿色的按钮拷贝地址然后在终端中输入如下命令克隆工程工程源码较大加上国内网络问题下载源码可能会比较耗时当有如下输出时表示克隆成功了如果有朋友实在下载太 目录 此文章属于 源码之下无秘密 ── 做最好的 Netty 源码分析教程 系列文章之一. 代码下载 首先到 Netty 的 Github 仓库 中...

    freewolf 评论0 收藏0
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程

    摘要:背景在工作中虽然我经常使用到库但是很多时候对的一些概念还是处于知其然不知其所以然的状态因此就萌生了学习源码的想法刚开始看源码的时候自然是比较痛苦的主要原因有两个第一网上没有找到让我满意的详尽的源码分析的教程第二我也是第一次系统地学习这么大代 背景 在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学...

    shenhualong 评论0 收藏0
  • netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

    摘要:结构作为服务端作为序列化数据的协议前端通讯演示地址服务端实现启动类长连接示例主线程组从线程组请求的解码和编码把多个消息转换为一个单一的或是,原因是解码器会在每个消息中生成多个消息对象主要用于处理大数据流,比如一个大小的文件如果你直接传输肯定 结构 netty 作为服务端 protobuf 作为序列化数据的协议 websocket 前端通讯 演示 GitHub 地址 showImg(...

    wua_wua2012 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<