资讯专栏INFORMATION COLUMN

Spring Boot RabbitMQ - 优先级队列

jackwang / 1444人阅读

摘要:官方镜像仓库地址本地运行访问可视化面板地址默认账号默认密码集成基本参数配置配置配置定义优先级队列定义交换器定义参考官方文档应用启动后,会自动创建和,并相互绑定,优先级队列会有如图所示标识。

Docker With RabbitMQ

官方 Docker 镜像仓库地址

https://hub.docker.com/_/rabb...

本地运行 RabbitMQ

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

访问可视化面板

地址:http://127.0.0.1:15672/

默认账号:guest

默认密码:guest

Spring Boot With RabbitMQ

Spring Boot 集成 RabbitMQ

        
            org.springframework.boot
            spring-boot-starter-amqp
        

基本参数配置

# host & port
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672

Queue / Exchange / Routing 配置

/**
 * RabbitMQ 配置
 */
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE = "priority-exchange";

    public static final String QUEUE = "priority-queue";

    private static final String ROUTING_KEY = "priority.queue.#";

    /**
     * 定义优先级队列
     */
    @Bean
    Queue queue() {
        Map args= new HashMap<>();
        args.put("x-max-priority", 100);
        return new Queue(QUEUE, false, false, false, args);
    }

    /**
     * 定义交换器
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

}
priority queue 定义参考官方文档:https://www.rabbitmq.com/priority.html

Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。
RabbitMQ Publisher

Spring Boot 相关配置

# 是否开启消息发送到交换器(Exchange)后触发回调
spring.rabbitmq.publisher-confirms=false
# 是否开启消息发送到队列(Queue)后触发回调
spring.rabbitmq.publisher-returns=false
# 消息发送失败重试相关配置
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=3000ms
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=1

发送消息

@Component
@AllArgsConstructor
public class FileMessageSender {

    private static final String EXCHANGE = "priority-exchange";

    private static final String ROUTING_KEY_PREFIX = "priority.queue.";

    private final RabbitTemplate rabbitTemplate;

    /**
     * 发送设置有优先级的消息
     *
     * @param priority 优先级
     */
    public void sendPriorityMessage(String content, Integer priority) {
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
                message -> {
                    message.getMessageProperties().setPriority(priority);
                    return message;
                });
    }

}
RabbitMQ Consumer

Spring Boot 相关配置

# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
# 最小线程数量
spring.rabbitmq.listener.simple.concurrency=10
# 最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10
# 每个消费者可能未完成的最大未确认消息数量
spring.rabbitmq.listener.simple.prefetch=1
消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。

监听消息

@Slf4j
@Component
public class MessageListener {

    /**
     * 处理消息
     */
    @RabbitListener(queues = "priority-queue")
    public void listen(String message) {
        log.info(message);
    }

}
番外补充

1、自定义消息发送确认的回调

配置如下:

# 开启消息发送到交换器(Exchange)后触发回调
spring.rabbitmq.publisher-confirms=true
# 开启消息发送到队列(Queue)后触发回调
spring.rabbitmq.publisher-returns=true

自定义 RabbitTemplate.ConfirmCallback 实现类

@Slf4j
public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息唯一标识: {}", correlationData);
        log.info("确认状态: {}", ack);
        log.info("造成原因: {}", cause);
    }

}

自定义 RabbitTemplate.ConfirmCallback 实现类

@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息主体: {}", message);
        log.info("回复编码: {}", replyCode);
        log.info("回复内容: {}", replyText);
        log.info("交换器: {}", exchange);
        log.info("路由键: {}", routingKey);
    }

}

配置 rabbitTemplate

@Component
@AllArgsConstructor
public class RabbitTemplateInitializingBean implements InitializingBean {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() {
        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
    }
    
}

2、RabbitMQ Exchange 类型

中文:RabbitMQ Exchange类型详解

English: RabbitMQ Tutorials

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74706.html

相关文章

  • 一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列

    摘要:另一种就是用中的位于包下,本质是由和实现的阻塞优先级队列。表明了一条消息可在队列中存活的最大时间。当某条消息被设置了或者当某条消息进入了设置了的队列时,这条消息会在时间后死亡成为。 SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可...

    selfimpr 评论0 收藏0
  • 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

    摘要:慕课网消息中间件极速入门与实战学习总结时间年月日星期三说明本文部分内容均来自慕课网。 慕课网《RabbitMQ消息中间件极速入门与实战》学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 评论0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用讲解下上面命令行表示控制台端口号,可以在浏览器中通过控制台来执行的相关操作。同时从控制台可以看到发送的速率多线程测试性能开了个线程,每个线程发送条消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次写了篇文章,《SpringBoot Kafka 整合...

    yuanxin 评论0 收藏0
  • 一起来学SpringBoot | 第十二篇:初探RabbitMQ消息队列

    摘要:用于控制活动人数,将超过此一定阀值的订单直接丢弃。缓解短时间的高流量压垮应用。目前比较推荐的就是我们手动然后将消费错误的消息转移到其它的消息队列中,做补偿处理消费者该方案是默认的方式不太推荐。 SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相...

    Baoyuan 评论0 收藏0
  • 快速入门spring-amqp

    摘要:它还为具有侦听器容器的消息驱动的提供支持。接收消息当存在基础结构时,可以使用任何来注释以创建侦听器端点。默认情况下,如果禁用重试并且侦听器抛出异常,则会无限期地重试传递。 Spring-amqp-tutorial Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。它提供了一个模板作为发送和接收消息的高级抽象。它还为具有侦听器容器的消息驱动的PO...

    邹强 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<