摘要:在公开的方法中,为的设置了继承于回调句柄。如此看来,如果想要异步通信完毕后,处理一些回调,则只需实现,并在适当的位置设置到的的里。在其保护方法里,创建了对象,并传入了。
ActiveMQChannelHandler
NettyConnector在公开的start方法中,为Channel的pipeline设置了ActiveMQChannelHandler(继承于io.netty.channel.ChannelDuplexHandler)回调句柄。
ActiveMQChannelHandler其构造函数定义如下:
ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, final BaseConnectionLifeCycleListener listener)
可见它接收了一个BufferHandler对象。在其channelRead这个callback方法中,调用了这个BufferHandler对象bufferReceived方法。
如此看来,如果想要Netty异步通信完毕后,处理一些回调,则只需实现BufferHandler,并在适当的位置设置到Netty的Channel的pipeline里。
BufferHandlerClientSessionFactoryImpl在其保护方法createConnector里,创建了NettyConnector对象,并传入了DelegatingBufferHandler。
DelegatingBufferHandler实现了BufferHandler,可用来处理Netty回调。
DelegatingBufferHandler定义如下,它是定义在ClientSessionFactoryImpl类里的:
private class DelegatingBufferHandler implements BufferHandler { @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { RemotingConnection theConn = connection; if (theConn != null && connectionID.equals(theConn.getID())) { theConn.bufferReceived(connectionID, buffer); } else { logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } } }
也就是说,在Netty执行回调时,会调用ClientSessionFactory中的成员对象connection(类型:RemotingConnection)的bufferReceived方法来处理数据。
RemotingConnection实际上RemotingConnection也是一种BufferHandler
RemotingConnection(Impl)实现了bufferReceived(connectionID, buffer)方法,该方法会根据传入的buffer来decode出一个package。
bufferReceived
=> doBufferReceived (以下ChannelImpl对应的实例,是根据decode出来的package对应的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)
以ArtemisMQMessageProducer为例:
他的send方法中,最后是调用core api的ClientProducer的send方法的,传入一个core api的handler —— CompletionListenerWrapper(继承于SendAcknowledgementHandler类型),它包装了JMS的CompletionListener。
再转到ClientProducer的send方法, 它又调用了doSend方法,
然后它又调用了sendRegularMessage方法,它又调用了sessionContext.sendFullMessage方法。
在sessionContext.sendFullMessage方法里,可以看到,handler被包装到packet里了,并且传给了sessionChannel.sendBatched(packet)方法去异步发送了。
在服务器返回的packet里,也会带有这个handler,然后BufferHandler的实现者RemotingConnection(Impl)的bufferReceived方法会被回调,它会解析服务器回传的packet里的handler进行执行。
CompletionListenerWrapper类定义:packet是SessionSendMessage类型的消息的别名
sessionContext.sendFullMessage方法里负责将SendAcknowledgementHandler包装到SessionSendMessage类型的packet里,然后才发送至服务器
服务器返回的packet,也会首先被转换成SessionSendMessage类型,然后获取里面包含的SendAcknowledgementHandler类型的回调handler执行回调。
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler { private final CompletionListener completionListener; private final Message jmsMessage; private final ActiveMQMessageProducer producer; /** * @param jmsMessage * @param producer */ private CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, ActiveMQMessageProducer producer) { this.completionListener = listener; this.jmsMessage = jmsMessage; this.producer = producer; } @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { if (jmsMessage instanceof StreamMessage) { try { ((StreamMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } if (jmsMessage instanceof BytesMessage) { try { ((BytesMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } try { producer.connection.getThreadAwareContext().setCurrentThread(true); completionListener.onCompletion(jmsMessage); } finally { producer.connection.getThreadAwareContext().clearCurrentThread(true); } } @Override public String toString() { return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/66894.html
摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...
摘要:通过以上修改保证了客户端连接能够快速的断开,在应用重启时不会持续往这边发送消息,我使用进行压测,重启消费者过程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我们开发了两个使用Artemis做消息队列实现的积分模块和PUSH推送模块,在几轮测试以后,大家信心满满的正式上线了,而且经过...
摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各组件详解里有组件介绍及如何正确使用的内容。因此的做法会大大降低性能,并且将大部分的时间都花在反复重建这些对象上。提供的可以让使用避免频繁创建的问题。至于使用的性能测试则留给同学自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各组件详解里有Spring J...
阅读 1545·2021-10-25 09:44
阅读 2912·2021-09-04 16:48
阅读 1498·2019-08-30 15:44
阅读 2378·2019-08-30 15:44
阅读 1711·2019-08-30 15:44
阅读 2797·2019-08-30 14:14
阅读 2930·2019-08-30 13:00
阅读 2104·2019-08-30 11:09