资讯专栏INFORMATION COLUMN

rocketmq之producer解析

luodongseu / 808人阅读

摘要:所以基于目前的设计,建议关闭自动创建的功能,然后根据消息量的大小,手动创建。如果发送消息,返回结果超时,这种超时不会进行重试了如果是方法本身耗时超过,还未来得及调用发送消息,此时的超时也不会重试。

先来看下producer核心的类设计,如下图:

1、核心发布消息的类DefaultMQProducer,继承自MQProducer接口,此接口定义了一系列发送消息的方法,如普通消息,顺序消息,延时消息等,最终进行网络通信会交给MQClientAPIImpl处理。

2、rocketmq从4.1.3版本开始又支持了事务消息,由TransactionMQProducer类提供(之后会有专门的文章进行详细解读事务消息)

producer之配置

我们看到DefaultMQProducer继承了一个客户端的公共配置类ClientConfig(与consumer公用),其实就是一个普通的javaBean,既可以代码中设置属性,也可以集成spring来配置

参数名 默认值 说明
namesrvAddr nameserver的地址列表,用分号隔开
clientIP 本机ip地址 客户端ip地址,有时候无法识别,需要手动配置
instanceName DEFAULT 客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads cpu核数 通信层客户端处理请求的线程数
pollNameServerInterval 30000 轮询nameserver的时间间隔,单位ms
heartbeatBrokerInterval 30000 向broker发送心跳的时间间隔,单位ms
persistConsumerOffsetInterval 5000 持久化 Consumer 消费进度间隔时间,单位ms

producer独有的配置:

参数名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer组名,相同分组的producer应该有相同的发送消息逻辑
createTopicKey AUTO_CREATE_TOPIC_KEY 自动创建topic时,以此默认topic为模板创建指定topic
defaultTopicQueueNums 4 自动创建topic队列数量
sendMsgTimeout 3000 发送消息的超时时间,单位ms
compressMsgBodyOverHowmuch 4098 消息体超过多大会进行压缩,单位字节
retryTimesWhenSendFailed 2 同步发送消息,发送失败重试次数
retryTimesWhenSendAsyncFailed 2 异步发送消息,发送失败的重试次数
retryAnotherBrokerWhenNotStoreOK false 同步发送消息,消息存储失败是否重试其他broker
maxMessageSize 4194304 客户端限制消息的大小,默认4M
TransactionListener 事务消息时,必须设置的回查监听器
producer之group概念

我们在创建producer时必须要指定一个group,这里有两个作用:

生产者一般会是集群部署的,group用来标识一类生产者,相同group的生产者一般要有相同的发送逻辑。

在发送事务消息时,当事务消息异常,broker端来回查事务状态时,需要知道是由哪类生产者发送的事务消息,生产端会根据group名称来查找对应的producer来执行相应的回查逻辑。

producer的启动流程

简单说明下整个启动流程:

1、首先在DefaultMQProducerImpl中会做一些参数校验,如group是否合法;然后会创建MQClientInstance实例,此实例包含网络连接、线程资源等,相同的clientId会共享此实例,所以通过MQClientManager来管理。

2、核心的启动流程在MQClientInstance类中,如果nameserver地址没有配置的话,会先通过静态的http服务器地址去抓取nameserver的地址;再则启动netty客户端。

3、启动一些定时任务,跟producer有关的如下几个:

如果producer没有配置nameserver地址,启动定时抓取nameserver的地址的定时任务,任务延时10s开始,每隔2分支执行一次。

轮询nameserver定时任务,主要是定时更新topic的路由信息,任务延时10ms开始,每隔30s执行一次。

清除下线的broker和向broker发送心跳,任务延时1s执行,每隔30s执行一次

Producer如何寻址

RocketMQ 有多种配置方式可以令客户端找到 NameServer, 然后通过 NameServer 再找到 Broker,分别如下,
优先级由高到低,高优优先级会覆盖低优先级

1、代码中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

2、启动参数指定

-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

3、环境变量指定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

4、HTTP 静态服务器寻址(默认)

如果以上三种都没有设置name server的地址,客户端启动后先会访问一个静态http服务器获取name server的地址,然后会启动一个定时任务访问这个静态 HTTP 服务器,地址如下:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

这是默认的地址,当然你也可以更改,做如下设置:

代码:

System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")

或者启动参数指定:

-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer

以上设置后http服务器地址就变成:

http://localhsot:8080/rocketmq/nameServer

这个 URL 的返回内容格式如下:

192.168.0.1:9876;192.168.0.2:9876

客户端每隔 2 分钟访问一次这个 HTTP 服务器,并更新本地的 Name Server 地址。

推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群可以热升级。

发送消息时如何获取路由信息

1、broker在启动的时候通过参数autoCreateTopicEnable设置是否自动创建topic,默认为true,此时会创建一个名为TBW102(4.3版本已经改名为AUTO_CREATE_TOPIC_KEY)的topic(参见类TopicConfigManager),broker在向namesrv注册时会把默认的topic注册上去。如果设置false,则不会注册。

2、producer在发送消息时会在本地获取路由信息,第一次发送的话本地肯定没有,就会去namesrv获取,如果此时namesrv也没有,则会获取TBW102的topic信息(参见DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此为模板创建topic,然后选择topic下的一台broker发,broker创建后,会通过心跳注册到namesrv上。

3、如果autoCreateTopicEnable设置false的话,producer发送消息会报找不到路由的异常,此时必须手动创建topic。

建议autoCreateTopicEnable设置false,基于以上第二步,自动创建topic后,以后所有该TOPIC的消息,都将发送到刚才选择的这台broke上,达不到负载均衡的目的。所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

可以通过管理工具mqadmin来手动创建topic

sh mqadmin updateTopic -c [集群名称] -n [nameserver地址] -t [topic名称] -w [写队列数] -r [读队列数]

手动创建了Topic后,producer就可以轮询的发送到不同的broker了。

topic的队列数

这里讲一下自动创建的topic的队列数如何设置,首先broker创建的模板topic=AUTO_CREATE_TOPIC_KEY的队列是8,参见类TopicConfigManager:

public TopicConfigManager(BrokerController brokerController) { 
    //省略无关代码
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                     .getDefaultTopicQueueNums());
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                      .getDefaultTopicQueueNums());
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
     //省略无关代码
}

BrokerConfig:

private int defaultTopicQueueNums = 8;

DefaultMQProducer端默认知道要创建的topic的队列数是4

private volatile int defaultTopicQueueNums = 4;

MQClientInstance类的方法updateTopicRouteInfoFromNameServer中有这样一段逻辑:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略无关代码
    for (QueueData data : topicRouteData.getQueueDatas()) {
        int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
        data.setReadQueueNums(queueNums);
        data.setWriteQueueNums(queueNums);
    }
    //省略无关代码
 }

创建队列是取两者最小的一个,也就是4,所以要设置topic的队列数量,很明显了设置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。这是自动创建Topic时队列数的设置方法,上面也提到生成环境一般不会开启自动创建Topic的功能,可以通过上面的手动创建Topic的指令来设置读写队列数。你可能注意到了Topic下有读写队两个队列数,分别代表上面意思呢?读写队列其实是个逻辑概念,一个broker下topic的总队列数是以写队列为准,而读队列意思是允许多少队列可以被消费者消费,也就是说读多写少的情况下,没有问题,队列都可以被消费掉,如果写多读少的话,那么就会存在队列不会被消费的情况。

消息发送

前面我们讲到了如何获取topic的路由信息,如何创建topic的队列数,一个topic下有多个队列,又可以分布在不同的broker上面,所以topic的总队列数应该是所有broker上的topic下队列数的总和。

备注:如果手动在每个broker上分别创建topic的话,相同topic在不同broker上的队列数可以不一样。

那么问题来了,在发送消息时根据怎么样的策略来选择一个队列发送呢?rocketmq提供了一个MQFaultStrategy策略类来负责选择队列,这里会有一个参数sendLatencyFaultEnable是否开启延迟故障,

该值默认为false,在不开启的情况下,相同线程发送消息是轮询topic下的所有队列,不同线程发送是随机的,核心代码如下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        //省略不必要的代码......
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
//以上代码逻辑参见类MQFaultStrategy.selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //省略不必要的代码......
    }
}
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
//以上代码逻辑参见类TopicPublishInfo
public int getAndIncrement() {
    Integer index = this.threadLocalIndex.get();//ThreadLocal中获取
    if (null == index) {//为空,随机生成一个
        index = Math.abs(random.nextInt());
        if (index < 0)
            index = 0;
        this.threadLocalIndex.set(index);
    }
    index = Math.abs(index + 1);
    if (index < 0)
        index = 0;
    this.threadLocalIndex.set(index);
    return index;
}
//以上代码参见类ThreadLocalIndex

每次获取index的时候都是从本地线程变量ThreadLocal中获取,没有的情况下就是随机生成一个,加1取绝对值后返回,再对队列列表的长度取模,所以在同一线程中,会轮训的从队列列表获取队列。而如果是不同线程的话,index是随机生成的,所以就是随机从队列列表中获取。如下图所示:

可以看到选择队列方法的入参有一个lastBrokerName的入参,此参数的目的是在发送消息失败的情况下,producer会重试再次发送,而再次发送选择的队列需要另选一个broker,lastBrokerName就是要过滤掉失败的broker,选择下一个broker的队列进行发送消息。

开启延迟故障,每当发送完一次消息,不管成功还是失败,都会把此次存储消息的broker给保存下来,记录故障情况下此broker需要延长多长时间才能再次发送,目前看到在代码里面写死了,故障下30s之内是不能再向此broker发送消息了。

消息重试

producer的send方法本身支持内部重试,重试逻辑如下:

1、最大重试次数默认2次,可以通过参数retryTimesWhenSendFailed设置

2、发送失败,则轮询到下一个broker,如果此时只有一个broker在线呢?那就会轮训这个broker下的其他队列。

3、这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认为3s。

如果发送消息,broker返回结果超时,这种超时不会进行重试了;如果是方法本身耗时超过sendMsgTimeout ,还未来得及调用发送消息,此时的超时也不会重试。

以上策略其实也很难保证同步发送消息一定成功,如果应用要保证消息不丢失,最好先把消息存储到db,后台启线程定时重试,确保消息一定存储到broker。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77288.html

相关文章

  • 高并发异步解耦利器:RocketMQ究竟强在哪里?

    摘要:它是阿里巴巴于年开源的第三代分布式消息中间件。是一个分布式消息中间件,具有低延迟高性能和可靠性万亿级别的容量和灵活的可扩展性,它是阿里巴巴于年开源的第三代分布式消息中间件。上篇文章消息队列那么多,为什么建议深入了解下RabbitMQ?我们讲到了消息队列的发展史:并且详细介绍了RabbitMQ,其功能也是挺强大的,那么,为啥又要搞一个RocketMQ出来呢?是重复造轮子吗?本文我们就带大家来详...

    tainzhi 评论0 收藏0
  • RocketMQ源码学习(一)-概述

    摘要:每个与集群中的所有节点建立长连接,定时注册信息到所有。完全无状态,可集群部署。本系列源码解析主要参照原理简介来追寻其代码实现虽然版本不太一致但这也是能找到的最详细的资料了接下来根据其模块来源码阅读目录如下 为什么选择读RocketMQ? 对MQ的理解一直不深,上周看了,还是觉得不够深入,找个成熟的产品来学习吧,RabbitMQ是erLang写的,Kafka是Scala写的,非Java写...

    godlong_X 评论0 收藏0
  • 如何解决MQ消息消费顺序问题

    摘要:利用的高级特性特性是一种负载均衡的机制。在一个消息被分发到之前,首先检查消息属性。属性为某个值的消息单个消息或消息集合在描述,和的对应关系,以及负载均衡策略时。同样做到了保证消息的顺序情况下,均衡消费的消费消息。 通常mq可以保证先到队列的消息按照顺序分发给消费者消费来保证顺序,但是一个队列有多个消费者消费的时候,那将失去这个保证,因为这些消息被多个线程并发的消费。但是有的时候消息按照...

    Atom 评论0 收藏0
  • SpringBoot RocketMQ 整合使用和监控

    摘要:前提通过前面两篇文章可以简单的了解和安装,今天就将和整合起来使用。然后我运行之前的整合项目,查看监控信息如下总结整篇文章讲述了与整合和监控平台的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ...

    Jacendfeng 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<