摘要:修改的实现,实现接口在改造笔记一整合到中修改实现,添加对的实现如下负载过大,处理不过来时,会回调该方法例如可以发生邮件通知相关人员改造笔记四解决中的调用两次
发现问题
在io.moquette.spi.impl.BrokerInterceptor的构造函数中,新建了一个线程池,代码如下:
private BrokerInterceptor(int poolSize, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);这句代码虽然创建了一个固定线程数量的线程池,但是线程池的任务队列并没有做限制,一旦某个InterceptHandler中的某个方法进行了耗时处理,在高并发的情况下,会很容易导致线程池的队列堆积大量待处理的任务,进而可能造成内存溢出。
解决问题分别添加以下类和接口
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 创建线程池的逻辑改为
private BrokerInterceptor(int poolSize, int capacity, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解释:
(1)ThreadPoolHelper中的createFixedExecutor()方法为新建的线程池指定任务队列大小和拒绝策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先将被拒绝的任务log一遍,对于PublishTask(moquette改造笔记(二):优化BrokerInterceptor notifyTopicPublished()逻辑)做特殊处理,会交给实现RejectHandler的InterceptHandler处理,由业务逻辑决定,出现任务太多处理不完被遗弃的任务该如何处理。
在 moquette改造笔记(一):整合到SpringBoot 中修改SafetyInterceptHandler实现,添加对RejectHandler的实现如下
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { /**MQTT SERVICE 负载过大,处理不过来时,会回调该方法*/ //例如可以发生邮件通知相关人员 } }
moquette改造笔记(四):解决InterceptHandler中的onConnectionLost()调用两次
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77202.html
摘要:优化逻辑优化方向向启动方法一样,每次调用的方法都是在线程池中新建一个任务具体代码解释新建一个用来实现调用方法。改造笔记三优化中的线程池 发现问题 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源码 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:整合到本文更加注重代码实践,对于配置相关的知识会一笔带过,不做过多的详解。笔者是上传到私服,然后通过导入。接口是预留给开发者根据不同事件处理业务逻辑的接口。改造笔记二优化逻辑 Moquette简介 Mqtt作为物联网比较流行的协议现在已经被大范围使用,其中也有很多开源的MQTT BROKEN。Moquette是用java基于netty实现的轻量级的MQTT BROKEN. Moquet...
摘要:发现问题在使用中发现在设备频繁上下线和两个设备一样相互顶替连接的情况下,的和的方法调用没有先后顺序,如果在这两个方法里面来记录设备上下线状态,会造成状态不对。因为相互顶替的情况并不多见,因此两个也可以接受,在性能上并不会造成多大影响。 发现问题 在moquette使用中发现在设备频繁上下线和两个设备ClientId一样相互顶替连接的情况下,InterceptHandler的onConn...
摘要:发现问题在使用中设备异常断开中的。在中事件都是在链中依次传递的。事件最后传递到。解决方法添加会导致调用两次解释会在该从链中移除掉时被调用,一般的话没有手动从链中删除时,会在连接断开后回调该方法。 发现问题 在使用中设备异常断开,InterceptHandler中的onConnectionLost()。经过调试发现是MoquetteIdleTimeoutHandler中的代码导致的,代码...
摘要:常见标高线程上下文切换频繁线程太多锁竞争激烈标高如果的占用很高,排查涉及到的程序,比如把改造成。抖动问题原因字节码转为机器码需要占用时间片,大量的在执行字节码时,导致长期处于高位现象,占用率最高解决办法保证编译线程的占比。 一、并发 Unable to create new native thread …… 问题1:Java中创建一个线程消耗多少内存? 每个线程有独自的栈内存,共享堆内...
阅读 1964·2021-11-24 10:45
阅读 1824·2021-10-09 09:43
阅读 1265·2021-09-22 15:38
阅读 1189·2021-08-18 10:19
阅读 2796·2019-08-30 15:55
阅读 3016·2019-08-30 12:45
阅读 2936·2019-08-30 11:25
阅读 329·2019-08-29 11:30