资讯专栏INFORMATION COLUMN

白话RabbitMQ(二): 任务队列

fnngj / 1755人阅读

摘要:任务队列最主要的功能就是解耦高耗时的操作,否则程序会一直等在那里,浪费大量资源。我们将任务封装成一个消息发送给队列,后台的任务进程会得到这个任务并执行它,而且可以配置多个任务进程,进一步加大吞吐率。为了确保消息不丢失,支持消息确认。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在第一篇中我们描述了如何最简单的RabbitMQ操作,如何发送、接受消息。在今天这篇文章中我们将描述如何创建一个任务队列,来将高耗时的任务分发到多个消费者,从而提高处理效率。

任务队列最主要的功能就是解耦高耗时的操作,否则程序会一直等在那里,浪费大量资源。反之我们会把这个操作交给队列,让它延后再做。我们将任务封装成一个消息发送给队列,后台的任务进程会得到这个任务并执行它,而且可以配置多个任务进程,进一步加大吞吐率。

特别是对于网络请求,一次短短的HTTP请求是要求迅速响应的,不可能让它一直停顿在高耗时操作上。

准备工作

在第一章中我们发送了“Hello World!”。现在来完成更复杂一点的,因为这里并没有真正的高耗时操作,比如缩放图像或输出一个pdf。因此我们只是用Thread.sleep()来假装我们很繁忙,而且会用"."来表示需要停顿的秒数,比如一个叫Hello...的任务将停顿3秒钟。

我们简单的更改下Send.java,称之为 NewTask.java.

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

然后是工具类

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

当然,我们的Recv.java也需要进行一些改造,它需要对每一个"."停顿1秒,Work.java如下

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ".") Thread.sleep(1000);
    }
}    

编译上面这些代码

javac -cp $CP NewTask.java Worker.java
轮询调度

任务队列的一个最大优点是可以并行工作,能够非常容易的水平扩张。

首先,让我们同时运行两个工作线程,他们能够同时从队列获取消息。我们也需要同时开启3个console:1个生产者,2个消费者

消费者C1

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

消费者C2

# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

让我们运行生产者

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....    

让我们看看消费者们
消费者C1

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "First message."
# => [x] Received "Third message..."
# => [x] Received "Fifth message....."   

消费者C2

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "Second message.."
# => [x] Received "Fourth message..   

RabbitMQ默认有序的将会发送消息给下一个消费者,所以每一个消费者都会得到相同数量的消息,这种方式就叫做轮询调度(round-robin),你可以尝试下更多的消费者

消息确认

一个任务可能非常耗时,如果消费者在做一个高耗时任务时挂掉了,我们将会丢失所有发送到这个消费者上的消息。这是非常不可取的,所以我们希望能够明确的知道消息是否消费成功,如果一个消费挂了,我们能够知道,并且将消息发送给下一个消费者。

为了确保消息不丢失,RabbitMQ支持消息确认。收到消息后消费者会给RabbitMQ服务器发送一个ack(我已经收到消息了),RabbitMQ就会在服务上删除这个消息了。

如果一个消费者挂了(连接关闭,channel关闭,或者是TCP连接丢失)而没有发送ack,RabbitMQ就会知道消息并没有消费成功,于是乎消息会被放到消息队列重新消费。如果此时还有其它消费者的话,消息会发送给其它消费者来消费,确保消息不会丢失

消息并没有超时时间这个概念,消息只会在消费者挂掉了时候重发,即使是一个非常非常耗时的的消费者也不会发生重发

手动消息确认(Manual message acknowledgments)默认是打开的,虽然我们之前关闭了它:autoAck=true。让我们先将它设置为false

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

这样一来,即使你使用CTRL+C强制杀死了一个消费者,消费者所丢失的消息也将会被重发,会被另一个消费者所接受并消费。

忘记应答

很容易犯忘记应答的错误,但会导致非常严重的后果。Messages会被重发,RabbitMQ会消耗越来越多的内存因为unacked的消息无法释放(甚至更严重,RabbitMQ内部维护了一个最大打开线程数,如果太多的消息没有应答,RabbitMQ甚至会整个崩溃掉)

你可以用Rabbitmqctl查看未被应答的消息数

sudo rabbitmqctl list_queues name messages_ready      
 messages_unacknowledged

windows下:

rabbitmqctl.bat list_queues name messages_ready     
messages_unacknowledged
消息持久化

我们现在知道了可以通过应答来保证消息不丢失,但万一RabbitMQ挂了呢?还是可能会导致消息丢失。因此我们可以通过持久化的机制,包括将队列以及队列中的消息持久化的方式,来保证即便RabbitMQ挂了,当它重启的时候,队列以及消息也能够恢复

首先做队列的持久化,声明队列为durable

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

但很可惜的是,这种声明方式并不适用与上面的方法,因为我们已经将“Hello”定义为一个非持久化的队列了,是不能再将他改为持久化的,如果这样做,将会直接返回一个error信息。所以,我们需要重新再定义一个队列

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

在保证队列的持久化后需要保证消息的持久化-将消息设置为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());  
公平分发

但这样还是存在问题:假设有如下的情形,一个消费者非常耗时,而一个消费者非常快,由于消息都是公平的发送,所以它们都是接收到相同数量的消息,会导致一个消费者非常忙碌,而另外一个消费者非常空闲,而RabbitMQ无法得知这一点。

为了解决这个缺陷我们引入了basicQos方法以及prefetchCount =1的设置。这会告诉RabbitMQ一次只给消费者一个消息:如果这个消息未确认,将不会发送新的消息,从而它会将消息发送给其它并不那么忙的消费者

int prefetchCount = 1;
channel.basicQos(prefetchCount);
留意queue size

如果所有的消费者都非常忙,队列可能会很快被填满,所以你需要留意这一点,要么增加更多的消费者,或者采取其它的策略。

整合

NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent "" + message + """);

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received "" + message + """);
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == ".") {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

使用消息确认和prefetchCount你就能设置一个持久化队列了,同时,使用durable和persist,,即使RabbitMQ挂掉了,重启后也能够重发消息

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68099.html

相关文章

  • 白话RabbitMQ(三):发布/订阅

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https...

    Ververica 评论0 收藏0
  • 白话rabbitmq(一): HelloWorld

    摘要:作为消息队列的一个典型实践,完全实现了标准,与的快快快不同,它追求的稳定可靠。同一个队列不仅可以绑定多个生产者,而且能够发送消息到多个消费者。消费者接受并消费消息。几乎于完全类似是一个继承了接口的类,方便我们来存储消息队列来的消息。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的...

    garfileo 评论0 收藏0
  • 白话RabbitMQ(四): 建立路由

    摘要:可以参考源码,项目支持网站,最新文章或实现会更新在上面前言在订阅发布中我们建立了一个简单的日志系统,从而将消息广播给一些消费者。因此,发送到路由键的消息会发送给队列,发送到路由键或者的消息会发送给,其它的消息将被丢弃。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决...

    CoderStudy 评论0 收藏0
  • 白话RabbitMQ(五): 主题路由器(Topic Exchange)

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。主题交换机也可以当成其它交换机来使用,假如队列绑定到了那么它会接收所有的消息,就像广播路由器一样而如果未使用,那么就跟直达路由器一样了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性...

    Gilbertat 评论0 收藏0
  • 白话RabbitMQ(六): RPC

    摘要:因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列是一种非常好的方式,这里我们使用了长度为的,的功能是检查消息的的是不是我们之前所发送的,如果是,将返回值返回到。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考...

    KevinYan 评论0 收藏0

发表评论

0条评论

fnngj

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<