资讯专栏INFORMATION COLUMN

netty 基于 protobuf 协议 实现 websocket 版本的简易客服系统

Shihira / 1973人阅读

摘要:结构作为服务端作为序列化数据的协议前端通讯演示地址服务端实现启动类长连接示例主线程组从线程组请求的解码和编码把多个消息转换为一个单一的或是,原因是解码器会在每个消息中生成多个消息对象主要用于处理大数据流,比如一个大小的文件如果你直接传输肯定

结构

netty 作为服务端

protobuf 作为序列化数据的协议

websocket 前端通讯

演示
GitHub 地址

netty 服务端实现

Server.java 启动类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

//websocket长连接示例
public class Server {
    public static void main(String[] args) throws Exception{
        
        // 主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组
        EventLoopGroup wokerGroup = new NioEventLoopGroup();
        
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerChannelInitializer());
            
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
        
    }
}

ServerChannelInitializer.java

import com.example.nettydemo.protobuf.MessageData;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.util.List;

import static io.netty.buffer.Unpooled.wrappedBuffer;


public class ServerChannelInitializer extends ChannelInitializer {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // HTTP请求的解码和编码
        pipeline.addLast(new HttpServerCodec());
        // 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse,
        // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
        pipeline.addLast(new HttpObjectAggregator(65536));
        // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了
        pipeline.addLast(new ChunkedWriteHandler());
        // WebSocket数据压缩
        pipeline.addLast(new WebSocketServerCompressionHandler());
        // 协议包长度限制
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true));
        // 协议包解码
        pipeline.addLast(new MessageToMessageDecoder() {
            @Override
            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List objs) throws Exception {
                ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
                objs.add(buf);
                buf.retain();
            }
        });
        // 协议包编码
        pipeline.addLast(new MessageToMessageEncoder() {
            @Override
            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List out) throws Exception {
                ByteBuf result = null;
                if (msg instanceof MessageLite) {
                    result = wrappedBuffer(((MessageLite) msg).toByteArray());
                }
                if (msg instanceof MessageLite.Builder) {
                    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                }

                // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
                // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的

                WebSocketFrame frame = new BinaryWebSocketFrame(result);
                out.add(frame);
            }
        });
    
        // 协议包解码时指定Protobuf字节数实例化为CommonProtocol类型
        pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance()));
        
        // websocket定义了传递数据的6中frame类型
        pipeline.addLast(new ServerFrameHandler());
        
    }
}

ServerFrameHandler.java

import com.example.nettydemo.protobuf.MessageData;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.List;

//处理文本协议数据,处理TextWebSocketFrame类型的数据,websocket专门处理文本的frame就是TextWebSocketFrame
public class ServerFrameHandler extends SimpleChannelInboundHandler {
    
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    //读到客户端的内容并且向客户端去写内容
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception {
        // channelGroup.add();
    
        Channel channel = ctx.channel();
        System.out.println(msg.getUserName());
        System.out.println(msg.getAge());
        System.out.println(msg.getPassword());
        MessageData.ResponseUser bank = MessageData
                .ResponseUser.newBuilder()
                .setUserName("你好,请问有什么可以帮助你!")
                .setAge(18).setPassword("11111").build();

        channel.writeAndFlush(bank);
    }
    
    //每个channel都有一个唯一的id值
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //打印出channel唯一值,asLongText方法是channel的id的全名
        // System.out.println("handlerAdded:"+ctx.channel().id().asLongText());
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生");
        ctx.close();
    }
    
}
protobuf 文件的使用

proto 文件

syntax ="proto2";

package com.example.nettydemo.protobuf;

//optimize_for 加快解析的速度
option optimize_for = SPEED;
option java_package = "com.example.nettydemo.protobuf";
option java_outer_classname="MessageData";

// 客户端发送过来的消息实体
message RequestUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}

// 返回给客户端的消息实体
message ResponseUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}
生成 proto 的Java 类
批量生成工具,直接找到这个 bat 或者 sh 文件,在对应的平台执行就可以了具体可以自行百度 protobuf 怎么使用

Windows 版本

set outPath=../../java
set fileArray=(MessageDataProto ATestProto)

# 将.proto文件生成java类
for %%i in %fileArray% do (
    echo generate cli protocol java code: %%i.proto
    protoc --java_out=%outPath% ./%%i.proto
)

pause

sh 版本 地址: https://github.com/lmxdawn/ne...

#!/bin/bash

outPath=../../java
fileArray=(MessageDataProto ATestProto)

for i in ${fileArray[@]};
do
    echo "generate cli protocol java code: ${i}.proto"
    protoc --java_out=$outPath ./$i.proto
done
websocket 实现



    
    WebSocket客户端







欢迎访问客服系统

回复消息:

扩展阅读

spring boot 实现的后台管理系统
vue + element-ui 实现的后台管理界面,接入 spring boot API接口

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72801.html

相关文章

  • netty 基于 protobuf 协议 实现 websocket 版本简易客服系统

    摘要:结构作为服务端作为序列化数据的协议前端通讯演示地址服务端实现启动类长连接示例主线程组从线程组请求的解码和编码把多个消息转换为一个单一的或是,原因是解码器会在每个消息中生成多个消息对象主要用于处理大数据流,比如一个大小的文件如果你直接传输肯定 结构 netty 作为服务端 protobuf 作为序列化数据的协议 websocket 前端通讯 演示 GitHub 地址 showImg(...

    wua_wua2012 评论0 收藏0
  • Spring整合NettyWebSocket互联网聊天系统

    摘要:当用户注销或退出时,释放连接,清空对象中的登录状态。聊天管理模块系统的核心模块,这部分主要使用框架实现,功能包括信息文件的单条和多条发送,也支持表情发送。描述读取完连接的消息后,对消息进行处理。 0.前言 最近一段时间在学习Netty网络框架,又趁着计算机网络的课程设计,决定以Netty为核心,以WebSocket为应用层通信协议做一个互联网聊天系统,整体而言就像微信网页版一样,但考虑...

    My_Oh_My 评论0 收藏0
  • Netty(三) 什么是 TCP 拆、粘包?如何解决?

    摘要:是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。可基于分隔符解决。编解码的主要目的就是为了可以编码成字节流用于在网络中传输持久化存储。 showImg(https://segmentfault.com/img/remote/1460000015895049); 前言 记得前段时间我们生产上的一个网关出现了故障。 这个网关逻辑非常简单,就是接收客户端的请求然后解析报文最后发送...

    YanceyOfficial 评论0 收藏0

发表评论

0条评论

Shihira

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<