摘要:性能正在发布过多的消息系统性能,注意请使用单线程的万条毫秒万条毫秒万毫秒万条毫秒多线程的正在发布过多的消息问题异常信息正在进行过多的发布解决办法消息发送发送限流用多带带的一个线程来完成消息的推送不用这个,使用就没有事增加的值反思笔者出现这个错
mqttclient性能&MQTT(32202): 正在发布过多的消息
org.eclipse.paho.client.mqttv3
2.2 GHz Intel Core i7 mac系统
publish性能,注意请使用单线程的 mqttclinet
1万条 341毫秒
4万条 1163毫秒
5万 1450毫秒
10万条 2700毫秒
多线程的 mqttclinet MQTT(32202): 正在发布过多的消息 问题
异常信息[15:07:21]: publish failed, message: aaaa 正在进行过多的发布 (32202) at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027) at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161) at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28) at java.lang.reflect.Method.invoke(Native Method) at java.lang.reflect.Method.invoke(Method.java:372) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501) at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587) at java.lang.Thread.run(Thread.java:818)解决办法
消息发送发送限流
用多带带的一个线程来完成 MQ 消息的推送 (不用这个MqttAsyncClient ,使用MqttClient 就没有事)
options.setMaxInflight(1000) 增加 actualInFlight 的值;
反思笔者出现这个错误是因为使用 EventBus, 之前使用多带带线程的 Handler 是没有问题的, 调查发现, 使用 EventBus 是新建线程运行的, 而 Handler 是多带带一个线程.
所以当发送大量消息的时候, EventBus 几乎是同一个点发出去, 就会造成这个错误
根据堆栈信息找到报错地方
if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); }
其中 actualInFlight 如下
// processed until the inflight window has space. if (actualInFlight < this.maxInflight) { // The in flight window is not full so process the // first message in the queue result = (MqttWireMessage)pendingMessages.elementAt(0); pendingMessages.removeElementAt(0); actualInFlight++; //@TRACE 623=+1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)}); }
从 pendingMessages 中取出消息时, actualInFlight 加 1, maxInflight 可以自己设定, 默认值为 10.
public class ClientState { ... volatile private Vector pendingMessages; ... }
在 ClientState 中:
public void send(MqttWireMessage message, MqttToken token) throws MqttException { ... if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { ... } }
可以看到 pendingMessages 中添加元素的时候并没有做 qos 类型的判断
private void decrementInFlight() { final String methodName = "decrementInFlight"; synchronized (queueLock) { actualInFlight--; //@TRACE 646=-1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)}); if (!checkQuiesceLock()) { queueLock.notifyAll(); } } }
当收到消息反馈时 actualInFlight 减 1.
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75369.html
摘要:现在很多网站都通过服务来实现消息推送及数据即时同步功能,即时通讯组件逐渐成为产品的标配。目前国内有很多成熟稳定的第三方即时通讯服务厂家,比如融云。 现在很多网站、APP都通过IM服务来实现消息推送及数据即时同步功能,即时通讯组件逐渐成为产品的标配。目前国内有很多成熟稳定的第三方即时通讯服务厂家,比如:融云。使用这些专业的服务可以提高开发效率而且服务稳定有保障。 如果自己DIY或者需要在...
摘要:本文是其中的一个解决方案。地址客户端服务端前端网页介绍,消息队列遥测传输是开发的一个即时通讯协议,有可能成为物联网的重要组成部分。必须用于在顶层分隔符之后,除了当自己指定时。 1. 问题描述 最近,本实验室大量上马云测量,云监控方面的项目,大概是属于物联网应用的一个分支。老板也有将旧有仪器改造的想法,所以要实现仪器设备的云控制。本文是其中的一个解决方案。 2. 技术选型 消息队列:M...
摘要:超简单深度睡眠模式下远程采集温湿度信息项目背景相关技术深度睡眠模式温湿度采集数据收发前后端实现后端前端项目背景自己用收纳箱做了一个用于存放打印耗材的干燥箱,想用闲置的开发板和温湿度传感器做一个远程温湿度监测的小项目。 ...
摘要:前言前些日子了解到这样一个协议,可以在上达到即时通讯的效果,但网上并不能很方便地找到一篇目前版本的在下正确实现这个协议的博客。 前言 前些日子了解到mqtt这样一个协议,可以在web上达到即时通讯的效果,但网上并不能很方便地找到一篇目前版本的在node下正确实现这个协议的博客。 自己捣鼓了一段时间,理解不深刻,但也算是基本能够达到使用目的。 本文目的为对MQTT有需求的学习者提供一定程...
阅读 930·2019-08-30 15:55
阅读 503·2019-08-26 13:56
阅读 2037·2019-08-26 12:23
阅读 3276·2019-08-26 10:29
阅读 580·2019-08-26 10:17
阅读 2814·2019-08-23 16:53
阅读 651·2019-08-23 15:55
阅读 2726·2019-08-23 14:25