摘要:本文仍以该实例为例,探讨该自定义通信协议的具体工作流程,以及如何以注册的形式灵活插拔通信消息对象。进行二进制数据帧的解码操作时,数据帧中已包含了消息的功能位,据此可获取相应的编解码器,而后可以对该数据帧进行解析,生成相应的消息对象。
本文为该系列的第三篇文章,设计需求为:服务端程序和众多客户端程序通过 TCP 协议进行通信,通信双方需通信的消息种类众多。上一篇文章以一个具体的需求为例,探讨了指定的 Java 消息对象与其相应的二进制数据帧相互转换的方法。本文仍以该实例为例,探讨该自定义通信协议的具体工作流程,以及如何以注册的形式灵活插拔通信消息对象。
1. 以注册的形式实现通信消息对象的统一管理通过该系列的第二篇文章可知,各个消息对象的编解码器类均拥有一个静态工厂方法,用于手动传入功能位及功能文字描述,进而生成包含这些参数的编解码器。如此设计,使得所有消息的功能位和文字描述均能够统一管理,降低维护成本。
根据上述需求,可通过 Map 容器管理所有的编解码器,有如下优点:
进行消息对象生成操作时,可直接使用相应编解码器的消息对象静态创建方法。
进行消息对象的编码操作时,已拥有该 Java 消息对象,即可知道消息对象的功能位,据此可获取相应的编解码器;或者,每个 Java 消息对象均内含相应编解码器的引用,故可直接对该消息对象进行编码操作。
进行二进制数据帧的解码操作时,数据帧中已包含了消息的功能位,据此可获取相应的编解码器,而后可以对该数据帧进行解析,生成相应的 Java 消息对象。
通信消息对象注册方法如下所示:
/** * 消息对象的注册 * * @param toolkit 消息对象编解码器容器的工具类 */ private void initialMsg() { saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客户端解锁")); saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客户端初始化")); saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客户端ID设置")); saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客户端别名设置")); ... ... } /** * 将普通消息对象及其回复消息对象的编解码器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息对象编解码器 */ private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) { saveSpecialMsgCodec(baseMsgCodec); baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail()); saveSpecialMsgCodec(baseMsgCodec); } /** * 将消息对象的编解码器保存到 HashMap 中 * * @param baseMsgCodec 特定的编解码器 */ private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) { HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec); }
上述代码表明,如果有新的业务需求,需要增删「插拔」业务消息对象,只需在 initialMsg() 方法中,对相应编解码器的注册语句进行增删即可。
saveNormalMsgCodec(BaseMsgCodec) 方法可以同时注册特定业务消息对象及其通用回复消息对象,操作方法清晰、简洁。
所以,在启动该 Java 程序时,只需要在启动过程中,执行上述 initialMsg() 方法,即可完成所有业务消息对象的注册。
2. 多个消息对象自由组合进同一个数据帧的实现原理由该系列的第一篇文章可知,如果某二进制数据帧所要传输的数据体部分内容很少,导致一个帧的大部分容量均被帧头占据,导致有效数据的占比很小,这就产生了巨大的浪费,故数据帧的数据体部分由子帧组成,同一类子帧均可被组装进同一个数据帧。如此做法,整个通信链路的数据量会明显减少,IO 负担也会因此减轻。
该需求的实现原理如下所示:
/** * 启动一个Channel的定时任务,用于间隔指定的时间对消息队列进行轮询,并发送指定数据帧 * * @param deque 指定的消息发送队列 * @param channelId 指定 Channel 的序号 */ private void startMessageQueueTask(LinkedBlockingDequedeque, Integer channelId) { executorService.scheduleWithFixedDelay(() -> { try { BaseMsg baseMsg = deque.take(); // 从队列中取出一个消息对象,队列为空时阻塞 Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待极短的时间,保证队列中缓存尽可能多的对象 Channel channel = touchChannel(channelId); // 获取指定的待发送的 Channel List dataList = new ArrayList<>();// 子帧容器 ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 编码一个子帧 dataList.add(data); touchNeedReplyMsg(baseMsg); // 对该子帧设置检错重发任务 int length = data.readableBytes(); int flag = baseMsg.combineFrameFlag(); // 获取消息对象标识 while (true) { BaseMsg subMsg = deque.peek(); // 查看队列中的第一个消息对象 if (subMsg == null || subMsg.combineFrameFlag() != flag) { break; // 消息对象标识不同,即欲生成的主帧帧头不同,不能组合进同一主帧 } data = subMsg.subFrameEncode(channel.alloc().buffer()); if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) { break; } length += data.readableBytes(); dataList.add(data); // 组合进了同一主帧 deque.poll(); // 从队列中移除该消息对象 touchNeedReplyMsg(subMsg); } FrameMajorHeader frameHeader = new FrameMajorHeader( baseMsg.getMajorMsgId(), baseMsg.getGroupId(), baseMsg.getDeviceId(), length); // 生成主帧帧头消息对象 channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel进行发送 } catch (InterruptedException e) { logger.warn("消息队列定时发送任务被中断"); } }, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS); }
由代码可知,待发送的消息对象均被送入指定的发送队列进行缓存,某客户端相应的线程对队列进行操作,取出消息对象并进行编码、组装、发送等。当然,当客户端数量较多时,上述的线程实现方式可采用 Netty 的 NIO 方式进行优化,以降低系统开销。
由上述描述可知,欲发送一个消息对象,只需将该消息对象送入相应的发送队列即可。
3. 实际业务消息对象的编解码 3.1 消息对象的编码方式由于每个 Java 消息对象均内含相应编解码器的引用,故可直接对该消息对象进行编码操作,代码如下:
public abstract class BaseMsg implements Cloneable { private final BaseMsgCodec msgCodec; ... ... /** * 将 java 消息对象编码为 TCP 子帧 * * @param buffer 空白的 TCP 子帧的容器 * @return 保存有 TCP 子帧的容器 */ public ByteBuf subFrameEncode(ByteBuf buffer) { return msgCodec.code(this, buffer); } }3.2 消息对象的解码方式
首先根据数据帧的帧头,即可解析出 FrameMajorHeader 对象,然后即可调用如下方法完成子帧的解析工作。实现原理文章开头已指出。
/** * TCP 帧解码为 Java 消息对象 * * @param head 主帧头 * @param subMsgId 子帧功能位 * @param data 子帧数据 * @return 已解码的 Java 对象 */ public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) { BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId); return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data); }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/69231.html
摘要:基本消息对象的设计消息对象的设计主要由两部分组成特定数据帧对应的特定消息对象。该类包含上节数据帧主帧及子帧的所有公共信息,仅仅未包含子帧中的数据体信息,该需求由基本消息对象的子类实现。 开发工程中,有一个常见的需求:服务端程序和多个客户端程序通过 TCP 协议进行通信,通信双方需通信的消息种类众多,并且客户端的数量可能有数万个。为此,双方需要约定尽可能丰富、灵活的数据帧「数据包」协议,...
摘要:而实际两者之间的通信使用的是基于的自定义二进制数据帧,对象与数据帧之间需进行转换。该类实现了编码解码方法,故可对消息对象进行编码或对数据帧进行解码。该类的静态方法可通过指定功能消息对象生成相应的回复对象。 本文为该系列的第二篇文章,设计需求为:服务端程序和众多客户端程序通过 TCP 协议进行通信,通信双方需通信的消息种类众多。上一篇文章详细描述了该通信协议的二进制数据帧格式以及基本 J...
项目地址 showImg(https://segmentfault.com/img/remote/1460000019380071); 什么是 Puzzle Puzzle 是基于 Vue 和 Webpack4 实现的一种项目结构;业务模块可以像拼图一样与架构模块组合,形成不同的系统,而这一切都是可以在生产环境热插拔的;这意味着你可以随时向你的系统添加新的功能模块,甚至改版整个系统,而不需要全量替换...
摘要:的服务治理平台发源于早期的个人项目。客户端发现模式要求客户端负责查询注册中心,获取服务提供者的列表信息,使用负载均衡算法选择一个合适的服务提供者,发起接口调用请求。系统和系统之间,少不了数据的互联互通。随着微服务的流行,一个系统内的不同应用进行互联互通也是常态。 PowerDotNet的服务治理平台发源于早期的个人项目Power.Apix。这个项目借鉴了工作过的公司的服务治理方案,站在...
阅读 3691·2021-11-22 15:24
阅读 1596·2021-09-26 09:46
阅读 1907·2021-09-14 18:01
阅读 2604·2019-08-30 15:45
阅读 3527·2019-08-30 14:23
阅读 1870·2019-08-30 12:43
阅读 2917·2019-08-30 10:56
阅读 801·2019-08-29 12:20