摘要:优化逻辑优化方向向启动方法一样,每次调用的方法都是在线程池中新建一个任务具体代码解释新建一个用来实现调用方法。改造笔记三优化中的线程池
发现问题
下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源码
@Override public void notifyClientConnected(final MqttConnectMessage msg) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) { LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", msg.payload().clientIdentifier(), handler.getID()); executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg))); } } @Override public void notifyClientDisconnected(final String clientID, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) { LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}", clientID, username, handler.getID()); executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username))); } } @Override public void notifyClientConnectionLost(final String clientID, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) { LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, " + "interceptorId={}", clientID, username, handler.getID()); executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username))); } } @Override public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) { msg.retain(); executor.execute(() -> { try { int messageId = msg.variableHeader().messageId(); String topic = msg.variableHeader().topicName(); for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " + "interceptorId={}", clientID, messageId, topic, handler.getID()); handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); } } finally { ReferenceCountUtil.release(msg); } }); } @Override public void notifyTopicSubscribed(final Subscription sub, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) { LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}", sub.getClientId(), sub.getTopicFilter(), handler.getID()); executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username))); } }
可以发现在除了notifyTopicPublished(),其它方法中,在for循环中每次for循环,对于InterceptHandler的调用都是在线程池中每次都是新执行一个任务,但是在notifyTopicPublished()方法中是在一个线程中for循环依次调用,这样处理首先没有用到线程池的多线程,其次是一旦某个InterceptHandler的notifyTopicPublished方法是阻塞的,那么后面的InterceptHandler的notifyTopicPublished()都会被阻塞。
优化逻辑优化方向:向启动方法一样,每次调用InterceptHandler的notifyTopicPublished方法都是在线程池中新建一个任务
具体代码:
public class PublishTask implements Runnable { final MqttPublishMessage msg; final InterceptHandler interceptHandler; final String clientId; final String username; PublishTask(MqttPublishMessage msg, InterceptHandler interceptHandler, String clientId, String username) { this.msg = msg; this.interceptHandler = interceptHandler; this.clientId = clientId; this.username = username; } @Override public void run() { try { interceptHandler.onPublish(new InterceptPublishMessage(msg, clientId, username)); } finally { ReferenceCountUtil.release(msg); } } @Override public String toString() { return "PublishTask{" + "msg=" + msg + ", interceptHandler=" + interceptHandler + ", clientId="" + clientId + """ + ", username="" + username + """ + "}"; } public InterceptHandler getInterceptHandler() { return interceptHandler; } } @Override public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) { msg.retain(); try { for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { executor.execute(new PublishTask(msg.retainedDuplicate(), handler, clientID, username)); } } finally { ReferenceCountUtil.release(msg); } // executor.execute(() -> { // try { // int messageId = msg.variableHeader().messageId(); // String topic = msg.variableHeader().topicName(); // for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { // LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " // + "interceptorId={}", clientID, messageId, topic, handler.getID()); // handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); // } // } finally { // ReferenceCountUtil.release(msg); // } // }); }
解释:
(1)新建一个PublishTask用来实现调用handler.onPublish()方法。其中要注意
new PublishTask(msg.retainedDuplicate(), handler, clientID, username)中的msg.retainedDuplicate()
还要主要在两个finally中ReferenceCountUtil.release(msg);
(2)PublishTask的toString()和getInterceptHandler()可以先不用管,会在其它地方用到,下一篇文章会讲到。
moquette改造笔记(三):优化BrokerInterceptor 中的线程池
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77204.html
摘要:修改的实现,实现接口在改造笔记一整合到中修改实现,添加对的实现如下负载过大,处理不过来时,会回调该方法例如可以发生邮件通知相关人员改造笔记四解决中的调用两次 发现问题 在io.moquette.spi.impl.BrokerInterceptor的构造函数中,新建了一个线程池,代码如下: private BrokerInterceptor(int poolSize, List hand...
摘要:整合到本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。笔者是上传到私服,然后通过导入。接口是预留给开发者根据不同事件处理业务逻辑的接口。改造笔记二优化逻辑 Moquette简介 Mqtt作为物联网比较流行的协议现在已经被大范围使用,其中也有很多开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquet...
摘要:发现问题在使用中发现在设备频繁上下线和两个设备一样相互顶替连接的情况下,的和的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。因为相互顶替的情况并不多见,因此两个也可以接受,在性能上并不会造成多大影响。 发现问题 在moquette使用中发现在设备频繁上下线和两个设备ClientId一样相互顶替连接的情况下,InterceptHandler的onConn...
摘要:发现问题在使用中设备异常断开中的。在中事件都是在链中依次传递的。事件最后传递到。解决方法添加会导致调用两次解释会在该从链中移除掉时被调用,一般的话没有手动从链中删除时,会在连接断开后回调该方法。 发现问题 在使用中设备异常断开,InterceptHandler中的onConnectionLost()。经过调试发现是MoquetteIdleTimeoutHandler中的代码导致的,代码...
阅读 1918·2021-09-09 09:33
阅读 1075·2019-08-30 15:43
阅读 2610·2019-08-30 13:45
阅读 3264·2019-08-29 11:00
阅读 804·2019-08-26 14:01
阅读 3539·2019-08-26 13:24
阅读 454·2019-08-26 11:56
阅读 2668·2019-08-26 10:27