资讯专栏INFORMATION COLUMN

深入浅出 JMS(七) - ActiveMQ 与 Spring 整合

NoraXie / 3437人阅读

摘要:消费者,监听生产者往指定目的地发送消息后,接下来就是消费者对指定目的地的消息进行消费了。它不会动态的适应运行时需要和参与外部的事务管理。它很好的平衡了对提供者要求低先进功能如事务参与和兼容环境。

深入浅出 JMS(七) - ActiveMQ 与 Spring 整合 一、与spring整合实现ptp的同步接收消息 (1)config.properties
## ActiveMQ Config
activemq.brokerURL=tcp://127.0.0.1:61616
activemq.userName=admin
activemq.password=password
activemq.pool.maxConnection=10
activemq.queue=mailqueue
activemq.topic=mailtopic
(2)pom.xml


    org.springframework
    spring-jms
    4.3.7.RELEASE



    org.apache.activemq
    activemq-pool
    5.15.3
(3)spring-jms.xml



    
    
        
        
        
    

    
    
        
        
    
    
    
        
    

    
    
        
        
        
    

    
    
        
        
    

ConnectionFactory 是用于产生到 JMS 服务器的链接的,Spring 为我们提供了多个 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory。

SingleConnectionFactory :对于建立 JMS 服务器链接的请求会一直返回同一个链接,并且会忽略 Connection 的 close 方法调用。

CachingConnectionFactory :继承了 SingleConnectionFactory,所以它拥有 SingleConnectionFactory 的所有功能,同时它还新增了缓存功能,它可以缓存 Session、MessageProducer 和 MessageConsumer。这里我们使用 CachingConnectionFactory 来作为示例。

(4)生产者
import com.alibaba.fastjson.JSONObject;
import com.github.binarylei.jms.spring.core.Mail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
@Service("mqProducer")
public class MQProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMail(Mail mail) {
        jmsTemplate.send((session) -> {
            return session.createTextMessage(JSONObject.toJSONString(mail));
        });
    }
}
(5)消费者
import com.alibaba.fastjson.JSONObject;
import com.github.binarylei.jms.spring.core.Mail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
@Service("mqCustumer")
public class MQCustumer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination destination;

    public void sendMail() {
        String json = (String) jmsTemplate.receiveAndConvert(destination);
        Mail mail = (Mail) JSONObject.parseObject(json, Mail.class);
        System.out.println(mail);
    }
}
二、PTP 的异步调用

我们在 spring 中直接配置异步接收消息的监听器,这样就相当于在 spring 中配置了消费者,在接受消息的时候就不必要启动消费者了。

spring-jms.xml:




    
    
    

生产者往指定目的地 Destination 发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地 Destination了呢?这是通过 Spring 为我们封装的消息监听容器 MessageListenerContainer 实现的,它负责接收信息,并把接收到的信息分发给真正的 MessageListener 进行处理。

每个消费者对应每个目的地都需要有对应的 MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个 JMS 服务器,这是通过在配置 MessageConnectionFactory 的时候往里面注入一个 ConnectionFactory 来实现的。所以我们在配置一个 MessageListenerContainer 的时候有三个属性必须指定,一个是表示从哪里监听的 ConnectionFactory;一个是表示监听什么的 Destination;一个是接收到消息以后进行消息处理的 MessageListener。

Spring 一共为我们提供了两种类型的 MessageListenerContainer:

SimpleMessageListenerContainer :SimpleMessageListenerContainer 会在一开始的时候就创建一个会话 session 和消费者 Consumer,并且会使用标准的 JMS MessageConsumer.setMessageListener() 方法注册监听器让 JMS 提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的 JMS 规范,但一般不兼容 Java EE 的 JMS 限制。

DefaultMessageListenerContainer :在大多数情况下我们还是使用的 DefaultMessageListenerContainer,跟SimpleMessageListenerContainer 相比,DefaultMessageListenerContainer 会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对 JMS 提供者要求低、先进功能如事务参与和兼容 Java EE 环境。

消息监听器:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
public class MessageListener implements javax.jms.MessageListener  {

    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("消息:" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
三、发布订阅 同步接收

在 spring-jms.xml 中将 ActiveMQTopic 生成 Topic,其它没什么变化:

  
  
     
   

每天用心记录一点点。内容也许不重要,但习惯很重要!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68917.html

相关文章

  • SpringBoot ActiveMQ 整合使用

    摘要:介绍它是出品,最流行的,能力强劲的开源消息总线。是一个完全支持和规范的实现,尽管规范出台已经是很久的事情了,但是在当今的应用中间仍然扮演着特殊的地位。相关文章整合使用整合使用关注我转载请务必注明原创地址为安装同之前一样,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...

    gaara 评论0 收藏0
  • 消息队列ActiveMQ的使用详解

    摘要:学习消息队列的使用之前,我们先来搞清。是操作消息的接口。消息生产者由创建,并用于将消息发送到。接收消息打印结果这是接收到的消息消费者启动。。。。 通过上一篇文章 《消息队列深入解析》,我们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。 这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。 学习消息队列ActiveMQ的使用之前,我们先来搞清JMS。 J...

    niceforbear 评论0 收藏0
  • activemqspring整合,配置消费者监听器设置sessionAcknowledgeMode

    摘要:最近在研究的消息确认机制,在与整合时遇到的了一个问题。这时只需要把的值设置成自定义的类型即可。 最近在研究activemq的ack消息确认机制,在activemq与spring整合时遇到的了一个问题。JMS规范的ack消息确认机制有一下四种,定于在session对象中:AUTO_ACKNOWLEDGE = 1 :自动确认CLIENT_ACKNOWLEDGE = 2:客户端手动确认 DU...

    LiuRhoRamen 评论0 收藏0
  • Spring boot 集成 ActiveMQ

    摘要:安装到官方网站下载最新的的安装包,并解压到本地目录下,下载链接如下。修改消费者使用配置消费者监听的队列,其中是接收到的消息收到的报文为接收到的消息重新执行 安装ActiveMQ 到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/do...。showImg(https://segmentfau...

    Donne 评论0 收藏0

发表评论

0条评论

NoraXie

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<