摘要:查看未确认消息命令消息持久化问题虽然消息确认机制能够保证消费者挂掉时消息不丢失,但是当挂掉时,那就没法保证了,这时就需要持久化了。方法队列和消息必须设置为持久化队列持久化生成者和消费者声明队列参数设置为,已存在的队列不能重新设置参数值。
消息确认机制
当把basic_consume的参数no_ack设置为true时,消息达到消费者时就立刻被标记为删除状态,如果这时一个worker的消息来不及执行完成就被中止掉,那么这条消息就会丢失,所以需要一个消息确认机制,当worker挂掉后,把消息重新分发给另一个woker执行
方法:将no_ack设置为false,然后在回调函数中确认消息
$callback = function($msg){ echo " [x] Received ", $msg->body, " "; sleep(substr_count($msg->body, ".")); echo " [x] Done", " "; $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; $channel->basic_consume("task_queue", "", false, false, false, false, $callback);
需要特别注意的是,如何忘记确认消息,将耗尽内存。查看未确认消息命令:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged消息持久化问题
虽然消息确认机制能够保证消费者挂掉时消息不丢失,但是当rabbitmq挂掉时,那就没法保证了,这时就需要持久化
了。
方法:队列和消息必须设置为持久化
(1) 队列持久化:生成者和消费者声明队列参数durable设置为true,已存在的队列不能重新设置参数值。命令如下:
$channel->queue_declare("task_queue", false, true, false, false);
(2) 消息持久化:消息delivery_mode设置为2,如下:
$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );公平分发问题
问题:当多个worker处理队列消息,rabbitmq循环均匀分配消息到分一个worker,如果此时其中一个worker分配到比较耗时的任务,那么会出现这个worker会比较忙碌,而其他worker比较清闲的情况。
方法:在worker处理和确认消息之前,不要再向worker发送新消息,而是向下一个清闲的worker发送,把prefetch参数设置为1.
当然,如果所有的worker都很忙,这时候应该增加worker数量
$channel->basic_qos(null, 1, null);
参考链接:http://www.rabbitmq.com/tutor...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/26283.html
摘要:简介是开发中很平常的中间件,本文讲述的是怎么在一个项目中配置多源的,这里不过多的讲解的相关知识点。但是需要配置多个源时,第二个及其以上的就需要单独配置了,这里我使用的都是单独配置的。源码个人日拱一卒,不期速成 简介 MQ 是开发中很平常的中间件,本文讲述的是怎么在一个Spring Boot项目中配置多源的RabbitMQ,这里不过多的讲解RabbitMQ的相关知识点。如果你也有遇到需要...
摘要:后续介绍交换机,生产者直接将消息投递到中。消息,服务器和应用程序之间传送的数据,由和组成。也称为消息队列,保存消息并将它们转发给消费者。主要是应为和有一个绑定的关系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...
摘要:基础教程注本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。安装库这里我们首先将消息推入队列,然后消费者从队列中去除消息进行消费。 RabbitMQ 基础教程(1) - Hello World 注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。 如果你喜欢我的文章,可以关注我的私人博客:http:...
摘要:在发送后端监听声明的排他队列,当收到消息后比对正确则处理消息断开监听连接,然后此队列被系统自动回收。并且通过也看到了这条消息的返回。此时我们基本已经将问题锁定在端了。 背景 公司的一个项目使用rabbitmq作为broker进行交互,并且数据的查询方法使用RPC模式,RPC Client端使用java编写并使用springAMQP包与rabbitmq交互,在RPC Server端使用p...
阅读 2246·2023-04-26 01:50
阅读 709·2021-09-22 15:20
阅读 2584·2019-08-30 15:53
阅读 1587·2019-08-30 12:49
阅读 1706·2019-08-26 14:05
阅读 2705·2019-08-26 11:42
阅读 2302·2019-08-26 10:40
阅读 2592·2019-08-26 10:38