摘要:启动一个线程,获取阻塞队列的元素,当通道发生事件时,队列会被放入事件对象启动一个定时器,每个执行一次,扫描,超时没有获取结果的会被移除掉客户端跟服务器端差不多。而这个对象会在传输之前进行编码,消息接收到进行解码。
rocketMQ通信模块
Rocketmq的通信层是基于通信框架netty 4.0.21.Final之上做了简单的协议封装,基本的类图如下:
通讯模块是怎么进行的消息传输的先来看看服务器端启动做了什么:
netty服务器启动,监听在8888;netty设置了一个心跳检测器IdleStateHandler,读写超时时间为120s,在120s后都没有读写操作将会触发相应事件。
启动一个线程,获取阻塞队列eventQueue的元素,当netty channel通道发生CONNECT, CLOSE,IDLE,EXCEPTION事件时,队列会被放入事件对象
启动一个定时器Timer,每个1s执行一次,扫描ResponseFuture,超时没有获取结果的会被移除掉
客户端跟服务器端差不多。
rocketmq提供了三种通信方式:
一、invokeSyncImpl 同步调用(主要实现参见NettyRemotingAbstract.invokeSyncImpl)
同步调用是指客户端发起远程调用后,当前线程会被阻塞,直到服务器端返回结果或发生超时异常,我们在发送消息时需要同步知道消息发送成功还是失败,一般使用这种方式。
我们知道,netty是异步基于事件驱动的,当我们使用netty向远程服务器发送消息是通过channel.writeAndFlush方法,此方法是异步的,那我们如何同步的获取服务器的返回结果呢?这里的做法是在向服务器发送消息时设置一个唯一的序列号,本地会通过上下文保存一个ResponseFuture对象在Map中,key就是这个唯一的序列号,value就是这个ResponseFuture对象,ResponseFuture对象会设置一个CountDownLatch,每当发送完消息后,就会调用CountDownLatch的await方法挂起当前线程;当服务器返回结果时也会携带之前客户端传递过去的唯一序列号,这样就可以找到ResponseFuture对象,再调用CountDownLatch的countDown方法,此时客户端之前挂起的线程就会苏醒过来,完成一次同步调用。
二、invokeAsyncImpl异步调用(主要实现参见NettyRemotingAbstract.invokeAsyncImpl)
客户端发起远程调用前会先设置一个InvokeCallback类,当然也是设置在ResponseFuture对象中,调用结束后不会等待结果,当服务器返回时也是跟同步调用一样会在新的线程里面先找到ResponseFuture,然后执行回调接口也就是InvokeCallback的operationComplete方法。如果服务器返回结果超时,也会进行回调,客户端可以根据相关的状态来执行相关逻辑。
异步调用不会阻塞线程,调用后会立即返回,调用结果会在异步线程里面执行回调来获取,使用Async需要控制好节奏,不能发送的太快以防止压垮服务器端。所以在invokeAsyncImpl方法里面设置了一个信号量,默认是64个,只有获取到许可的请求才能真正发起远程调用。
三、invokeOnewayImpl 单向调用(主要实现参见NettyRemotingAbstract.invokeOnewayImpl)
客户端发送请求后不会等待服务端返回的结果,并且会忽略服务端的处理结果;当前线程调用完毕,调用方并不关心服务器端的处理结果,也不会被阻塞,跟异步调用一样需要控制好节奏以防压垮服务器端。在invokeOnewayImpl方法里面也设置了一个信号量,默认是256个,只有获取到许可的请求才能真正发起远程调用。
三种通信方式的对比
调用方式 | 特点 | 使用场景 |
---|---|---|
Sync | 同步阻塞 | 需要同步获取结果的场景 |
Async | 异步不阻塞 | 当前不需要结果,但是当服务器处理完后,需要做一些其他事情 |
Oneway | 异步不阻塞 | 不要需要结果,不保证消息一定发送成功 |
RemotingCommand是rocketMQ消息传输的媒介,所有的消息都会包装成RemotingCommand来进行传输。而这个对象会在netty传输之前进行编码,消息接收到进行解码。
RemotingCommand是由头部(header)和消息体(body)组成,消息发送的时候,头部和消息体会分开进行编码。那么RemotingCommand是如何组成的呢?
RemotingCommand的核心字段:
public class RemotingCommand{ private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap头部(header)extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body; }
请求头接收方和发起方的含义略有不同,下面的表格详细的说明:
字段名 | 类型 | Request | Resposne |
---|---|---|---|
code | int | 请求操作代码,接收方根据不同的代码做不同的操作 | 应答结果代码,0表示成功,非0表示各种错误代码 |
language | 枚举 | 请求方实现的语言,默认Java | 接收方实现的语言 |
version | int | 请求方版本 | 接收方版本 |
opaque | int | 请求方在同一连接上不同的请求标识代码,多线程连接服用使用 | 接收方不做修改,直接返回 |
flag | int | 通信层的标志位 | 通信层的标志位 |
remark | String | 传输自定义文本信息 | 错误详细描述 |
extFields | Map | 自定义字段 | 自定义字段 |
头信息里面还包括了CommandCustomHeader的自定义的一些头信息,会被通过反射的方式放在extFields字段里面
消息体消息体是直接变为byte数组,由客户端自己序列化,这两部分后一起放入netty传输的ByteBuffer中,一起传输到接收端
报文格式与序列化length | header length | headerData | bodyData |
---|---|---|---|
4个字节 | 4个字节(高一位字节表示序列化类型,低三位字节表示长度) |
length:表示整个数据包的长度 占4个字节
header length:表示header的长度(高一位字节表示序列化类型,低三位字节表示长度)
headerData的序列化有两种方式:
json:使用fastjson进行序列化
自定义:使用bytebuffer自定义序列化
Netty服务器端在启动时设置了TCP参数的含义SO_BACKLOG:1024
指定全连接队列数,linux系统在文件/proc/sys/net/core/somaxconn指定,默认128;
还有一个半连接队列数,linux在文件/proc/sys/net/ipv4/tcp_max_syn_backlog指定
SO_REUSEADDR:true
重用处于time_wait状态下的连接
SO_KEEPALIVE:false
保活机制
TCP_NODELAY:true
关闭Nagle算法,Nagle算法可以降低网络里小包的数量,从而提升网络性能,关闭可以提高实时性
SO_SNDBUF:65535
发送缓存区大小
SO_RCVBUF:65535
接受缓存区大小
SO_RCVLOWAT:接收缓存水位线
SO_SNDLOWAT:发送缓存水位线
它们一般被I/O复用系统调用用来判断socket是否可读或可写。当TCP接收缓冲区中可读数据的总数大于其低水位标记时,I/O复用系统调用将通知应用程序可以从对应的socket上读取数据;当TCP发送缓冲区中的空闲空间(可以写入数据的空间)大于其低水位标记时,I/O复用系统调用将通知应用程序可以往对应的socket上写入数据
在netty中好像没有看到有设置这两个参数
CONNECT_TIMEOUT_MILLIS:3000
连接超时时间
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77012.html
摘要:具体可以参考消息队列之具体可以参考实战之快速入门十分钟入门阿里中间件团队博客是一个分布式的可分区的可复制的基于发布订阅的消息系统主要用于大数据领域当然在分布式系统中也有应用。目前市面上流行的消息队列就是阿里借鉴的原理用开发而得。 我自己总结的Java学习的系统知识点以及面试问题,目前已经开源,会一直完善下去,欢迎建议和指导欢迎Star: https://github.com/Snail...
摘要:它是阿里巴巴于年开源的第三代分布式消息中间件。是一个分布式消息中间件,具有低延迟高性能和可靠性万亿级别的容量和灵活的可扩展性,它是阿里巴巴于年开源的第三代分布式消息中间件。上篇文章消息队列那么多,为什么建议深入了解下RabbitMQ?我们讲到了消息队列的发展史:并且详细介绍了RabbitMQ,其功能也是挺强大的,那么,为啥又要搞一个RocketMQ出来呢?是重复造轮子吗?本文我们就带大家来详...
摘要:分布式高并发微服务问阿里京东蚂蚁等大厂面试真题解析道跳槽涨薪必备精选面试题最新版大厂面试真题集点击这里免费领取点击这里免费领取 估计很多Java程序员平时主要的工作就是一些Web系统的业务开发,对于服务端IO程序以及网络通信编程做得并不多,但是对于高级或者资深程序员来说,IO通信以及服务端编...
阅读 1829·2021-10-20 13:49
阅读 1368·2019-08-30 15:52
阅读 2874·2019-08-29 16:37
阅读 1043·2019-08-29 10:55
阅读 3078·2019-08-26 12:14
阅读 1656·2019-08-23 17:06
阅读 3241·2019-08-23 16:59
阅读 2550·2019-08-23 15:42