资讯专栏INFORMATION COLUMN

简易RPC框架:基于 netty 的协议编解码

Loong_T / 3461人阅读

摘要:概述在简易框架需求与设计这篇文章中已经给出了协议的具体细节,协议类型为二进制协议,如下协议的解码我们称为,编码我们成为,下文我们将直接使用和术语。直接贴代码,参考前文提到的协议格式阅读以下代码协议编码器

概述

在《简易RPC框架:需求与设计》这篇文章中已经给出了协议的具体细节,协议类型为二进制协议,如下:

 ------------------------------------------------------------------------
 | magic (2bytes) | version (1byte) |  type (1byte)  | reserved (7bits) | 
 ------------------------------------------------------------------------
 | status (1byte) |    id (8bytes)    |        body length (4bytes)     |
 ------------------------------------------------------------------------
 |                                                                      |
 |                   body ($body_length bytes)                          |
 |                                                                      |
 ------------------------------------------------------------------------

协议的解码我们称为 decode,编码我们成为 encode,下文我们将直接使用 decode 和 encode 术语。

decode 的本质就是讲接收到的一串二进制报文,转化为具体的消息对象,在 Java 中,就是将这串二进制报文所包含的信息,用某种类型的对象存储起来。

encode 则是将存储了信息的对象,转化为具有相同含义的一串二进制报文,然后网络收发模块再将报文发出去。

无论是 rpc 客户端还是服务端,都需要有一个 decode 和 encode 的逻辑。

消息类型

rpc 客户端与服务端之间的通信,需要通过发送不同类型的消息来实现,例如:client 向 server 端发送的消息,可能是请求消息,可能是心跳消息,可能是认证消息,而 server 向 client 发送的消息,一般就是响应消息。

利用 Java 中的枚举类型,可以将消息类型进行如下定义:

/**
 * 消息类型
 *
 * @author beanlam
 * @version 1.0  
 */ 

public enum MessageType {

    REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE(
            (byte) 0x04), UNKNOWN((byte) 0xFF);

    private byte code;

    MessageType(byte code) {
        this.code = code;
    }

    public static MessageType valueOf(byte code) {
        for (MessageType instance : values()) {
            if (instance.code == code) {
                return instance;
            }
        }
        return UNKNOWN;
    }

    public byte getCode() {
        return code;
    }
}

在这个类中设计了 valueOf 方法,方便进行具体的 byte 字节与具体的消息枚举类型之间的映射和转换。

调用状态设计

client 主动发起的一次 rpc 调用,要么成功,要么失败,server 端有责任告知 client 此次调用的结果,client 也有责任去感知调用失败的原因,因为不一定是 server 端造成的失败,可能是因为 client 端在对消息进行预处理的时候,例如序列化,就已经出错了,这种错误也应该作为一次调用的调用结果返回给 client 调用者。因此引入一个调用状态,与消息类型一样,它也借助了 Java 语言里的枚举类型来实现,并实现了方便的 valueOf 方法:

/**

 * 调用状态

 *

 * @author beanlam

 * @version 1.0

 */

public enum InvocationStatus {

    OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT(

            (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE(

            (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR(

            (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED(

            (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY(

            (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR(

            (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF);

    private byte code;

    InvocationStatus(byte code) {

        this.code = code;

    }

    public static InvocationStatus valueOf(byte code) {

        for (InvocationStatus instance : values()) {

            if (code == instance.code) {

                return instance;

            }

        }

        return UNKNOWN;

    }

    public byte getCode() {

        return code;

    }

}
消息实体设计

我们将 client 往 server 端发送的统一称为 rpc 请求消息,一个请求对应着一个响应,因此在 client 和 server 端间流动的信息大体上其实就只有两种,即要么是请求,要么是响应。我们将会定义两个类,分别是 RpcRequest 和 RpcResponse 来代表请求消息和响应消息。

另外由于无论是请求消息还是响应消息,它们都有一些共同的属性,例如说“调用上下文ID”,或者消息类型。因此会再定义一个 RpcMessage 类,作为父类。

RpcMessage
/**

 * rpc消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcMessage {

    private MessageType type;

    private long contextId;

    private Object data;

    public long getContextId() {

        return this.contextId;

    }

    public void setContextId(long id) {

        this.contextId = id;

    }

    public Object getData() {

        return this.data;

    }

    public void setData(Object data) {

        this.data = data;

    }

    public void setType(byte code) {

        this.type = MessageType.valueOf(code);

    }

    public MessageType getType() {

        return this.type;

    }

    public void setType(MessageType type) {

        this.type = type;

    }

    @Override

    public String toString() {

        return "[messageType=" + type.name() + ", contextId=" + contextId + ", data="

                + data + "]";

    }

}
RpcRequest
import java.util.concurrent.atomic.AtomicLong;

/**

 * rpc请求消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcRequest extends RpcMessage {

    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

    public RpcRequest() {

        this(ID_GENERATOR.incrementAndGet());

    }

    public RpcRequest(long contextId) {

        setContextId(contextId);

        setType(MessageType.REQUEST);

    }

}
RpcResponse
/**

 *

 * rpc响应消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcResponse extends RpcMessage {

    private InvocationStatus status = InvocationStatus.OK;

    public RpcResponse(long contextId) {

        setContextId(contextId);

        setType(MessageType.RESPONSE);

    }

    public InvocationStatus getStatus() {

        return this.status;

    }

    public void setStatus(InvocationStatus status) {

        this.status = status;

    }

    @Override

    public String toString() {

        return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]";

    }

}
netty 编解码介绍

netty 是一个 NIO 框架,应该这么说,netty 是一个有良好设计思想的 NIO 框架。一个 NIO 框架必备的要素就是 reactor 线程模型,目前有一些比较优秀而且开源的小型 NIO 框架,例如分库分表中间件 mycat 实现的一个简易 NIO 框架,可以在这里看到。

netty 的主要特点有:微内核设计、责任链模式的业务逻辑处理、内存和资源泄露的检测等。其中编解码在 netty 中,都被设计成责任链上的一个一个 Handler。

decode 对于 netty 来说,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。

借助 netty 来实现协议编解码,实际上就是去在这两个handler里面实现编解码的逻辑。

decode

在实现 decode 逻辑时需要注意的一个问题是,由于二进制报文是在网络上发送的,因此一个完整的报文可能经过多个分组来发送的,什么意思呢,就是当有报文进来后,要确认报文是否完整,decode逻辑代码不能假设收到的报文就是一个完整报文,一般称这为“TCP半包问题”。同样,报文是连着报文发送的,意味着decode代码逻辑还要负责在一长串二进制序列中,分割出一个一个独立的报文,这称之为“TCP粘包问题”。

netty 本身有提供一些方便的 decoder handler 来处理 TCP 半包和粘包的问题。不过一般情况下我们不会直接去用它,因为我们的协议比较简单,自己在代码里处理一下就可以了。

完整的 decode 代码逻辑如下所示:

import cn.com.agree.ats.rpc.message.*;

import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory;

import cn.com.agree.ats.util.logfacade.IPuppetLogger;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**

 * 协议解码器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolDecoder extends ByteToMessageDecoder {

    private static final IPuppetLogger logger = AbstractPuppetLoggerFactory

            .getInstance(ProtocolDecoder.class);

    private boolean magicChecked = false;

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List list)

            throws Exception {

        if (!magicChecked) {

            if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) {

                return;

            }

            magicChecked = true;

            if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) {

                logger.warn(

                        "illegal data received without correct magic number, channel will be close");

                ctx.close();

                magicChecked = false; //this line of code makes no any sense, but it"s good for a warning

                return;

            }

        }

        if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        int bodyLength = in

                .getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET);

        if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        magicChecked = false;// so far the whole packet was received

        in.readShort(); // skip the magic

        in.readByte(); // dont care about the protocol version so far

        byte type = in.readByte();

        byte status = in.readByte();

        long contextId = in.readLong();

        byte[] body = new byte[in.readInt()];

        in.readBytes(body);

        RpcMessage message = null;

        MessageType messageType = MessageType.valueOf(type);

        if (messageType == MessageType.RESPONSE) {

            message = new RpcResponse(contextId);

            ((RpcResponse) message).setStatus(InvocationStatus.valueOf(status));

        } else {

            message = new RpcRequest(contextId);

        }

        message.setType(messageType);

        message.setData(body);

        list.add(message);

    }

}

可以看到,我们解决半包问题的时候,是判断有没有收到我们期望收到的报文,如果没有,直接在 decode 方法里面 return,等有更多的报文被收到的时候,netty 会自动帮我们调起 decode 方法。而我们解决粘包问题的思路也很清晰,那就是一次只处理一个报文,不去动后面的报文内容。

还需要注意的是,在 netty 中,对于 ByteBuf 的 get 是不会消费掉报文的,而 read 是会消费掉报文的。当不确定报文是否收完整的时候,我们都是用 get开头的方法去试探性地验证报文是否接收完全,当确定报文接收完全后,我们才用 read 开头的方法去消费这段报文。

encode

直接贴代码,参考前文提到的协议格式阅读以下代码:

/**

 *

 * 协议编码器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolEncoder extends MessageToByteEncoder {

    @Override

    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)

            throws Exception {

        byte status;

        byte[] data = (byte[]) rpcMessage.getData();

        if (rpcMessage instanceof RpcRequest) {

            RpcRequest request = (RpcRequest) rpcMessage;

            status = InvocationStatus.OK.getCode();

        } else {

            RpcResponse response = (RpcResponse) rpcMessage;

            status = response.getStatus().getCode();

        }

        out.writeShort(ProtocolMetaData.MAGIC);

        out.writeByte(ProtocolMetaData.VERSION);

        out.writeByte(rpcMessage.getType().getCode());

        out.writeByte(status);

        out.writeLong(rpcMessage.getContextId());

        out.writeInt(data.length);

        out.writeBytes(data);

    }

}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72065.html

相关文章

  • 简易RPC框架:序列化机制

    摘要:由于我们还未谈到具体的调用机制,因此暂且认为就是把一个包含了调用信息的对象,从经过序列化,变成一串二进制流,发送到了端。 概述 在上一篇文章《简易RPC框架:基于 netty 的协议编解码》中谈到对于协议的 decode 和 encode,在谈 decode 之前,必须先要知道 encode 的过程是什么,它把什么东西转化成了二进制协议。由于我们还未谈到具体的 RPC 调用机制,因此暂...

    walterrwu 评论0 收藏0
  • 手把手教你基于Netty实现一个基础RPC框架(通俗易懂)

    摘要:是一个分布式服务框架,以及治理方案。手写注意要点手写注意要点基于上文中对于协议的理解,如果我们自己去实现,需要考虑哪些技术呢其实基于图的整个流程应该有一个大概的理解。基于手写实现基于手写实现理解了协议后,我们基于来实现一个通信框架。阅读这篇文章之前,建议先阅读和这篇文章关联的内容。[1]详细剖析分布式微服务架构下网络通信的底层实现原理(图解)[2][年薪60W的技巧]工作了5年,你真的理解N...

    番茄西红柿 评论0 收藏2637
  • Netty(三) 什么是 TCP 拆、粘包?如何解决?

    摘要:是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。可基于分隔符解决。编解码的主要目的就是为了可以编码成字节流用于在网络中传输持久化存储。 showImg(https://segmentfault.com/img/remote/1460000015895049); 前言 记得前段时间我们生产上的一个网关出现了故障。 这个网关逻辑非常简单,就是接收客户端的请求然后解析报文最后发送...

    YanceyOfficial 评论0 收藏0
  • RPC框架原理及从零实现系列博客(一):思路篇

    摘要:等之所以支持跨语言,是因为他们自己定义了一套结构化数据存储格式,如的,用于编解码对象,作为各个语言通信的中间协议。 前段时间觉得自己一直用别人的框架,站在巨人的肩膀上,也该自己造造轮子了 一时兴起 就着手写起了RPC框架 这里写了系列博客拿给大家分享下 这篇是开篇的思路篇 项目最终的代码放在了我的github上https://github.com/wephone/Me... 欢迎sta...

    tracy 评论0 收藏0
  • Netty3文档翻译(二)

    摘要:丰富的缓存数据结构使用它自己的缓存来表示字节序列而不是的。针对有一个定义良好的事件模型。有一些协议是多层的建立在其他低级协议基础上。此外,甚至不是完全线程安全的。协议由标准化为。协议缓存整合是一个高效二进制协议的快速实现。 Chapter 2、结构概览 这一节我们将确认Netty提供的核心功能是什么,以及它们怎么构成一个完整的网络应用开发堆栈。 1、丰富的缓存数据结构 Netty使用它...

    Zhuxy 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<