资讯专栏INFORMATION COLUMN

Artemis的JMS客户端中的CompletionHandler是如何在artemis core

Edison / 966人阅读

摘要:在公开的方法中,为的设置了继承于回调句柄。如此看来,如果想要异步通信完毕后,处理一些回调,则只需实现,并在适当的位置设置到的的里。在其保护方法里,创建了对象,并传入了。

ActiveMQChannelHandler

NettyConnector在公开的start方法中,为Channel的pipeline设置了ActiveMQChannelHandler(继承于io.netty.channel.ChannelDuplexHandler)回调句柄。
ActiveMQChannelHandler其构造函数定义如下:

ActiveMQChannelHandler(final ChannelGroup group,
                       final BufferHandler handler,
                       final BaseConnectionLifeCycleListener listener)

可见它接收了一个BufferHandler对象。在其channelRead这个callback方法中,调用了这个BufferHandler对象bufferReceived方法。

如此看来,如果想要Netty异步通信完毕后,处理一些回调,则只需实现BufferHandler,并在适当的位置设置到Netty的Channel的pipeline里。

BufferHandler

ClientSessionFactoryImpl在其保护方法createConnector里,创建了NettyConnector对象,并传入了DelegatingBufferHandler。
DelegatingBufferHandler实现了BufferHandler,可用来处理Netty回调。

DelegatingBufferHandler

DelegatingBufferHandler定义如下,它是定义在ClientSessionFactoryImpl类里的:

private class DelegatingBufferHandler implements BufferHandler {

      @Override
      public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
         RemotingConnection theConn = connection;

         if (theConn != null && connectionID.equals(theConn.getID())) {
            theConn.bufferReceived(connectionID, buffer);
         } else {
            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
         }
      }
   }

也就是说,在Netty执行回调时,会调用ClientSessionFactory中的成员对象connection(类型:RemotingConnection)的bufferReceived方法来处理数据。

实际上RemotingConnection也是一种BufferHandler

RemotingConnection

RemotingConnection(Impl)实现了bufferReceived(connectionID, buffer)方法,该方法会根据传入的buffer来decode出一个package。
bufferReceived
=> doBufferReceived (以下ChannelImpl对应的实例,是根据decode出来的package对应的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)

举例:Artemis中实现的JMS规范下的Producer在异步投递消息后的回调函数是如何被调用的

以ArtemisMQMessageProducer为例:

他的send方法中,最后是调用core api的ClientProducer的send方法的,传入一个core api的handler —— CompletionListenerWrapper(继承于SendAcknowledgementHandler类型),它包装了JMS的CompletionListener。

再转到ClientProducer的send方法, 它又调用了doSend方法,

然后它又调用了sendRegularMessage方法,它又调用了sessionContext.sendFullMessage方法。

在sessionContext.sendFullMessage方法里,可以看到,handler被包装到packet里了,并且传给了sessionChannel.sendBatched(packet)方法去异步发送了。

在服务器返回的packet里,也会带有这个handler,然后BufferHandler的实现者RemotingConnection(Impl)的bufferReceived方法会被回调,它会解析服务器回传的packet里的handler进行执行。

packet是SessionSendMessage类型的消息的别名
sessionContext.sendFullMessage方法里负责将SendAcknowledgementHandler包装到SessionSendMessage类型的packet里,然后才发送至服务器
服务器返回的packet,也会首先被转换成SessionSendMessage类型,然后获取里面包含的SendAcknowledgementHandler类型的回调handler执行回调。

CompletionListenerWrapper类定义:
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {

      private final CompletionListener completionListener;
      private final Message jmsMessage;
      private final ActiveMQMessageProducer producer;

      /**
       * @param jmsMessage
       * @param producer
       */
      private CompletionListenerWrapper(CompletionListener listener,
                                        Message jmsMessage,
                                        ActiveMQMessageProducer producer) {
         this.completionListener = listener;
         this.jmsMessage = jmsMessage;
         this.producer = producer;
      }

      @Override
      public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
         if (jmsMessage instanceof StreamMessage) {
            try {
               ((StreamMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }
         if (jmsMessage instanceof BytesMessage) {
            try {
               ((BytesMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }

         try {
            producer.connection.getThreadAwareContext().setCurrentThread(true);
            completionListener.onCompletion(jmsMessage);
         } finally {
            producer.connection.getThreadAwareContext().clearCurrentThread(true);
         }
      }

      @Override
      public String toString() {
         return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
      }
   }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66894.html

相关文章

  • Spring Boot 参考指南(消息传递)

    摘要:还自动配置发送和接收消息所需的基础设施。支持是一个轻量级的可靠的可伸缩的可移植的消息代理,基于协议,使用通过协议进行通信。 32. 消息传递 Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTempla...

    Doyle 评论0 收藏0
  • ArtemisMQ“未消费之谜”

    摘要:通过以上修改保证了客户端连接能够快速的断开,在应用重启时不会持续往这边发送消息,我使用进行压测,重启消费者过程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我们开发了两个使用Artemis做消息队列实现的积分模块和PUSH推送模块,在几轮测试以后,大家信心满满的正式上线了,而且经过...

    tomato 评论0 收藏0
  • 使用Spring/Spring Boot集成JMS陷阱

    摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各组件详解里有组件介绍及如何正确使用的内容。因此的做法会大大降低性能,并且将大部分的时间都花在反复重建这些对象上。提供的可以让使用避免频繁创建的问题。至于使用的性能测试则留给同学自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各组件详解里有Spring J...

    xcold 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<