资讯专栏INFORMATION COLUMN

Spring boot整合RabbitMQ

snifes / 2585人阅读

摘要:如果你注定要成为厉害的人那问题的答案就深藏在你的血脉里。本篇文章主要讲述与的整合。有想了解重构的朋友,我之前也有对重构一书的解读,出门左转就能看到。

如果你注定要成为厉害的人, 那问题的答案就深藏在你的血脉里。

本篇文章主要讲述Spring Boot与RabbitMQ的整合。因为我们公司的云服务用到了RabbitMQ 技术,之前都是自己封装,正好我们也正在往SpringBoot转变,这个技术正好用到,看来代码又要重构咯。

有想了解重构的朋友,我之前也有对《重构》一书的解读,出门左转就能看到。

导包:

        
            org.springframework.boot
            spring-boot-starter-amqp
        
消息生产者

ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)

package cn.usr.springbootrabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * @program: Learn-SpringBootRabbitmq
 * @author: Rock 【shizhiyuan@usr.cn】
 * @Date: 2018/2/23 0023
 */
@Configuration
public class AmqpConfig {
    public static final String EXCHANGE = "spring-boot-exchange2";
    public static final String ROUTINGKEY = "spring-boot-routingKey2";


    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 这里需要显示调用才能进行消息的回调  必须要设置
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

  
RabbitTemplate
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

这里设置为原型,具体的原因在后面会讲到,在发送消息时通过调用RabbitTemplate中的如下方法:
一会调用的时候用:

 public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) 
Producer

调用啦:

package cn.usr.springbootrabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @program: Learn-SpringBootRabbitmq
 * @author: Rock 【shizhiyuan@usr.cn】
 * @Date: 2018/2/23 0023
 */
@Component
public class Producer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;


    /**
     * 构造方法注入
     */
    @Autowired
    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //这是是设置回调能收到发送到响应,confirm()在下面解释
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容,correlationData:消息ID)
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消费");
        } else {
            System.out.println("消息消费失败:" + cause);
        }
    }
}

如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。

消息消费者

还是在AmqpConfig.class里面

步骤就是

声明交换机

声明队列

绑定RoutingKey

/**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * 

*

* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true);//队列持久 } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); // 设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); //确认消息成功消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; }

下面是完整的配置:

package cn.usr.springbootrabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * @program: Learn-SpringBootRabbitmq
 * @author: Rock 【shizhiyuan@usr.cn】
 * @Date: 2018/2/23 0023
 */
@Configuration
public class AmqpConfig {
    public static final String EXCHANGE = "spring-boot-exchange2";
    public static final String ROUTINGKEY = "spring-boot-routingKey2";


    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        // 这里需要显示调用才能进行消息的回调  必须要设置
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }


    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * 

*

* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); // 设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); //确认消息成功消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; } }

到这里我就能完成SpringBoot整合RabbitMQ的数据收发了。

结果:

receive msg : ceshi-----?
 回调id:CorrelationData [id=dfe3b3d1-f5a3-42d9-a514-a73729e009d5]
消息成功消费

点赞收藏关注不迷路。么么哒

参考:http://blog.csdn.net/liaokail...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68539.html

相关文章

  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用讲解下上面命令行表示控制台端口号,可以在浏览器中通过控制台来执行的相关操作。同时从控制台可以看到发送的速率多线程测试性能开了个线程,每个线程发送条消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次写了篇文章,《SpringBoot Kafka 整合...

    yuanxin 评论0 收藏0
  • SpringBoot非官方教程 | 第十五篇:Springboot整合RabbitMQ

    摘要:创建消息监听,并发送一条消息在程序中,提供了发送消息和接收消息的所有方法。 这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息。我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter去订阅一个POJO类型的消息。 准备工作 15min IDEA maven 3.0 在开始构建项目之前,机器需...

    HollisChuang 评论0 收藏0
  • SpringBoot ActiveMQ 整合使用

    摘要:介绍它是出品,最流行的,能力强劲的开源消息总线。是一个完全支持和规范的实现,尽管规范出台已经是很久的事情了,但是在当今的应用中间仍然扮演着特殊的地位。相关文章整合使用整合使用关注我转载请务必注明原创地址为安装同之前一样,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...

    gaara 评论0 收藏0
  • spring boot - 收藏集 - 掘金

    摘要:引入了新的环境和概要信息,是一种更揭秘与实战六消息队列篇掘金本文,讲解如何集成,实现消息队列。博客地址揭秘与实战二数据缓存篇掘金本文,讲解如何集成,实现缓存。 Spring Boot 揭秘与实战(九) 应用监控篇 - HTTP 健康监控 - 掘金Health 信息是从 ApplicationContext 中所有的 HealthIndicator 的 Bean 中收集的, Spring...

    rollback 评论0 收藏0
  • 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

    摘要:慕课网消息中间件极速入门与实战学习总结时间年月日星期三说明本文部分内容均来自慕课网。 慕课网《RabbitMQ消息中间件极速入门与实战》学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<