摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。
Work模式
原文地址
在第一章中,我们写了通过一个queue来发送和接收message的简单程序。在这一章中,我们会创建一个workqueue,来将执行时间敏感的任务分发到多个worker中。
work模式主要的意图是要避免等待完成一个耗时的任务。取而代之地,我们延迟任务的执行,将任务封装成消息,将之发送到queue。一个运行着的worker进程会弹出这个任务并执行它。当运行多个worker进程时,任务会在它们之间分派。
这种模式在web应用中特别有用,因为在一个较短的HTTP请求窗口中不会去执行一个复杂的任务。
准备工作在上一章中,我们发送了一个”Hello World!"的message。现在我们将发送一个代表了复杂任务的字符串。这不是一个实际的任务,比如像调整图片大小或是重新渲染pdf文档,我们通Thead.sleep() 来模拟一个耗时的任务。message中的小圆点表示其复杂度,圆点越多则任务的执行越耗时。比如“Hello..."的message将耗时3秒。
我们简单的修改上一章的Send.java代码,允许在命令行发送任意message。新的类叫做NewTask.java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
同样的,我们修改上一章中的Recv.java,让它在处理message的时候根据小圆点进行睡眠。新的类叫Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
像在第一章一样编译这两个类
javac -cp $CP NewTask.java Worker.javaRound-robin分派
使用Task模式的一个明显的优势是让并行执行任务变得简单。我们只需要启动更多的worker就可以消减堆积的message,系统水平扩展简单。
首先,我们在同一时间启动两个worker。他们都会从queue获得message,来看一下具体细节。
打开了三个终端,两个是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三个终端里来发布新的任务message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
让我们看看worker的处理message的情况.第一个worker收到了第1,3,5message,第二个worker收到了第2,4个message。
默认情况下,RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到平均数量的message。这种方式称之为round-robin(轮询).
Message 确认执行任务需要一定的时间。你可能会好奇如果一个worker开始执行任务,但是中途异常退出,会是什么结果。在我们现在的代码中,一旦RabbitMQ将消息发送出去了,它会立即将该message删除。这样的话,就可能丢失message。
在实际场景中,我们不想丢失任何一个task。如果一个worker异常中断了,我们希望这个task能分派给另一个worker。
为了确保不会丢失message,RabbitMQ采用message确认机制。RabbitMQ只有收到该message的Ack之后,才会删除该消息。
如果worker中断退出了( channel关闭了,connection关闭了,或是TCP连接丢失了)而没有发送Ack,RabbitMQ会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的worker。这样就不用message丢失,即使是在worker经常异常中断退出的场景下。
不会有任何message会timeout。当消费者中断退出,RabbitMQ会重新分派message。即使消息的执行会花费很长的时间。
默认情况下,message是需要人工确认的。在上面的例子中,我们通过autoAck=true来关闭了人工确认。像下面这样,我们将该标志设置为false,worker就需要在完成了任务之后,发送确认。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代码保证即使当worker还在处理一条消息,而强制它退出,也不会丢失message。然后不久,所有未被确认的消息都会被重新分派。
发送确认必须和接收相同的channel。使用不同的channel进行确认会导致channel-level protocol 异常。
忘记确认消息是一个比较常见的错误,但是其后果是很严重的。当client退出时,message会被重新分派,但是RabbitMQ会占用越来越多的内存,因它无法释放那些未被确认的message。
可以通过rabbitmqctl来打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgedMessage 持久化
我们学习了在消费者出现问题的时候不丢失message。但是如果RabbitMQ服务器宕机了,我们还是会丢失message。
当RabbitMQ宕机时,默认情况下,它会”忘记“所有的queue和message。为了确保message不丢失,我们需要确认两件事情:我们要使得queue和message都是持久的。
首先,我们要确保RabbitMQ不会丢失我们设置好的queue。所以,我们要把它声明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然代码没有任何问题,但是光这样是无效的。因为我们之前已经定义过名字为hello的queue。RabbitMQ不允许你使用不同的参数去重新定义一个已经存在的queue,而且这还不会反悔任何错误信息。但是我们还是有别的方法,让我们使用一个别的名字,比如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
声明queue的改变要在生产者和消费者的代码里都进行修改。
接着我们要设置message的持久性,我们通过设置MessageProperties为PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
将message标记成持久的不能100%保证message不会丢失,虽然这告诉RabbitMQ将message保存到磁盘,然而在RabbitMQ从接到message到保存之间,仍然有一小段时间。同时RabbitMQ不会给每一条message执行fsync(2) -- 可能只是保存到了cache而没有写到磁盘上去。所以持久的保证也不是非常强,然后对我们简单的task queue来说则足够了。如果需要一个非常强的保证,则可以使用发布确认的方式。Fair 分派
你可能已经注意到分派的工作没有如我们所期望的来执行。比如在有2个worker的情况系,所有偶数的message耗时很长,而所有奇数的message则耗时很短,这样其中一个worker则一直被分派到偶数的message,而另一个则一直是奇数的message。RabbitMQ对此并不知晓,进而继续这样分派着message。
这样的原因是RabbitMQ是在message入queue的时候确定分派的。它不关心消费者ack的情况。
我们可以通过basicQos方法和prefetchCount(1)来解决这个问题。这个设置是让RabbitMQ给worker一次一个message。或者这么说,直到worker处理完之前的message并发送ack,才给worker下一个message。否则,Rabbitmq会将message发送给其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都处于忙碌状态,queue可能会被装满。必须监控queue深度,可能要开启更多的worker,或者采取其他的措施。开始执行
NewTask.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); } } }
Worker.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,来设定work queue。持久化选项则在RabbitMQ重启后能让任务得以恢复。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73901.html
摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。 发布订阅模式 在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会...
摘要:如果涉及返回值,就要用到本章提到的了。方法发送请求,并阻塞知道结果返回。当有消息时,进行计算并通过指定的发送给客户端。当接收到,则检查。如果和之前的匹配,则将消息返回给应用进行处理。 RPC模式 在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:这样的消息分发机制称作轮询。在进程挂了之后,所有的未被确认的消息会被重新分发。忘记确认这是一个普遍的错误,丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。 工作队列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我们写了一个程序从已经声明的队列中收发...
摘要:主题模式在上一章我们改进了我们的日志系统,如果使用我们只能简单进行广播,而使用则允许消费者可以进行一定程度的选择。为的会同时发布到这两个。当为时,会接收所有的。当中没有使用通配符和时,的行为和一致。 主题模式 在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个...
摘要:路由模式在之前的文章中我们建立了一个简单的日志系统。更形象的表示,如对中的感兴趣。为了进行说明,像下图这么来设置如图,可以看到有两个绑到了类型为的上。如图的设置中,一个为的就会同时发送到和。接收程序可以选择要接收日志的严重性级别。 路由模式 在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。 在本篇文章中,我们在这之上,添加一个新的功...
阅读 483·2019-08-30 15:44
阅读 900·2019-08-30 10:55
阅读 2730·2019-08-29 15:16
阅读 929·2019-08-29 13:17
阅读 2804·2019-08-26 13:27
阅读 571·2019-08-26 11:53
阅读 2121·2019-08-23 18:31
阅读 1886·2019-08-23 18:23