资讯专栏INFORMATION COLUMN

剖析Laravel队列系统--推送作业到队列

maochunguang / 2710人阅读

摘要:有几种有用的方法可以使用将作业推送到特定的队列在给定的秒数之后推送作业延迟后将作业推送到特定的队列推送多个作业推送特定队列上的多个作业调用这些方法之后,所选择的队列驱动会将给定的信息存储在存储空间中,供按需获取。

原文链接https://divinglaravel.com/queue-system/pushing-jobs-to-queue

There are several ways to push jobs into the queue:
有几种方法可以将作业推送到队列中:

Queue::push(new InvoiceEmail($order));

Bus::dispatch(new InvoiceEmail($order));

dispatch(new InvoiceEmail($order));

(new InvoiceEmail($order))->dispatch();

As explained in a previous dive, calls on the Queue facade are calls on the queue driver your app uses, calling the push method for example is a call to the push method of the QueueDatabaseQueue class in case you"re using the database queue driver.

There are several useful methods you can use:

调用Queue facade是对应用程序使用的队列驱动的调用,如果你使用数据库队列驱动,调用push方法是调用QueueDatabaseQueue类的push方法。

有几种有用的方法可以使用:

// 将作业推送到特定的队列
Queue::pushOn("emails", new InvoiceEmail($order));

// 在给定的秒数之后推送作业
Queue::later(60, new InvoiceEmail($order));

// 延迟后将作业推送到特定的队列
Queue::laterOn("emails", 60, new InvoiceEmail($order));

// 推送多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
]);

// 推送特定队列上的多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
], null, "emails");

After calling any of these methods, the selected queue driver will store the given information in a storage space for workers to pick up on demand.

调用这些方法之后,所选择的队列驱动会将给定的信息存储在存储空间中,供workers按需获取。

Using the command bus 使用命令总线

Dispatching jobs to queue using the command bus gives you extra control; you can set the selected connection, queue, and delay from within your job class, decide if the command should be queued or run instantly, send the job through a pipeline before running it, actually you can even handle the whole queueing process from within your job class.

The Bus facade proxies to the ContractsBusDispatcher container alias, this alias is resolved into an instance of BusDispatcher inside BusBusServiceProvider:

使用命令总线调度作业进行排队可以给你额外控制权; 您可以从作业类中设置选定的connection, queue, and delay 来决定命令是否应该排队或立即运行,在运行之前通过管道发送作业,实际上你甚至可以从你的作业类中处理整个队列过程。

Bug facade代理到 ContractsBusDispatcher 容器别名,此别名解析为BusDispatcher内的BusBusServiceProvider的一个实例:

$this->app->singleton(Dispatcher::class, function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
});

所以Bus::dispatch() 调用的 dispatch() 方法是 BusDispatcher 类的:

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}

This method decides if the given job should be dispatched to queue or run instantly, the commandShouldBeQueued() method checks if the job class is an instance of ContractsQueueShouldQueue, so using this method your job will only be queued in case you implement the ShouldQueue interface.

We"re not going to look into dispatchNow in this dive, it"ll be discussed in detail when we dive into how workers run jobs. For now let"s look into how the Bus dispatches your job to queue:

该方法决定是否将给定的作业分派到队列或立即运行,commandShouldBeQueued() 方法检查作业类是否是 ContractsQueueShouldQueue, 的实例,因此使用此方法,您的作业只有继承了ShouldQueue接口才会被放到队列中。

我们不会在这篇中深入dispatchNow,我们将在深入worker如何执行作业中详细讨论。 现在我们来看看总线如何调度你的工作队列:

public function dispatchToQueue($command)
{
    $connection = isset($command->connection) ? $command->connection : null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException("Queue resolver did not return a Queue implementation.");
    }

    if (method_exists($command, "queue")) {
        return $command->queue($queue, $command);
    } else {
        return $this->pushCommandToQueue($queue, $command);
    }
}

First Laravel checks if a connection property is defined in your job class, using the property you can set which connection Laravel should send the queued job to, if no property was defined null will be used and in such case Laravel will use the default connection.

Using the connection value, Laravel uses a queueResolver closure to build the instance of the queue driver that should be used, this closure is set inside BusBusServiceProvider while registering the Dispatcher instance:

首先 Laravel会检查您的作业类中是否定义了connection 属性,使用这个属性可以设置Laravel应该将排队作业发送到哪个连接,如果未定义任何属性,将使用null属性,在这种情况下Laravel将使用默认连接。

通过设置的连接,Laravel使用一个queueResolver闭包来构建应该使用哪个队列驱动的实例,当注册调度器实例的时候这个闭包在BusBusServiceProvider 中被设置:

function ($connection = null) use ($app) {
    return $app[ContractsQueueFactory::class]->connection($connection);
}

ContractsQueueFactory is an alias for QueueQueueManager, so in other words this closure returns an instance of QueueManager and sets the desired connection for the manager to know which driver to use.

Finally the dispatchToQueue method checks if the job class has a queue method, if that"s the case the dispatcher will just call this method giving you full control over how the job should be queued, you can select the queue, assign delay, set maximum retries, timeout, etc...

In case no queue method was found, a call to pushCommandToQueue() calls the proper pushmethod on the selected queue driver:

ContractsQueueFactoryQueueQueueManager的别名,换句话说,该闭包返回一个QueueManager实例,并为manager设置所使用的队列驱动需要的连接。

最后,dispatchToQueue方法检查作业类是否具有queue方法,如果调度器调用此方法,可以完全控制作业排队的方式,您可以选择队列,分配延迟,设置最大重试次数, 超时等

如果没有找到 queue 方法,对 pushCommandToQueue() 的调用将调用所选队列驱动上的push方法:

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    return $queue->push($command);
}

The dispatcher checks for queue and delay properties in your Job class & picks the appropriate queue method based on that.

调度器检查Job类中的 queuedelay ,并根据此选择适当的队列方法。

So I can set the queue, delay, and connection inside the job class? 所以我可以设置工作类中的队列,延迟和连接?

Yes, you can also set a tries and timeout properties and the queue driver will use these values as well, here"s how your job class might look like:

是的,您还可以设置一个triestimeout 属性,队列驱动也将使用这些值,以下工作类示例:

class SendInvoiceEmail{
    public $connection = "default";

    public $queue = "emails";

    public $delay = 60;

    public $tries = 3;

    public $timeout = 20;
}
Setting job configuration on the fly 即时设置作业配置

Using the dispatch() global helper you can do something like this:

使用 dispatch() 全局帮助方法,您可以执行以下操作:

dispatch(new InvoiceEmail($order))
        ->onConnection("default")
        ->onQueue("emails")
        ->delay(60);

This only works if you use the BusQueueable trait in your job class, this trait contains several methods that you may use to set some properties on the job class before dispatching it, for example:

这只有在您在作业类中使用 BusQueueable trait时才有效,此trait包含几种方法,您可以在分发作业类之前在作业类上设置一些属性,例如:

public function onQueue($queue)
{
    $this->queue = $queue;

    return $this;
}
But in your example we call the methods on the return of dispatch()! 但是在你的例子中,我们调用dispatch()的返回方法!

Here"s the trick:

这是诀窍:

function dispatch($job)
{
    return new PendingDispatch($job);
}

This is the definition of the dispatch() helper in Foundation/helpers.php, it returns an instance of BusPendingDispatch and inside this class we have methods like this:

这是在Foundation/helpers.php中的dispatch()帮助方法的定义,它返回一个BusPendingDispatch 的实例,并且在这个类中,我们有这样的方法:

public function onQueue($queue)
{
    $this->job->onQueue($queue);

    return $this;
}

So when we do dispatch(new JobClass())->onQueue("default"), the onQueue method of PendingDispatch will call the onQueue method on the job class, as we mentioned earlier job classes need to use the Queueable trait for all this to work.

所以当我们执行 dispatch(new JobClass())->onQueue("default"), 时,PendingDispatchonQueue 方法将调用job类上的 onQueue 方法,如前所述,作业类需要使用所有这些的 Queueable trait来工作。

Then where"s the part where the Dispatcher::dispatch method is called? 那么调用Dispatcher::dispatch方法的那部分是哪里?

Once you do dispatch(new JobClass())->onQueue("default") you"ll have the job instance ready for dispatching, the actual work happens inside PendingDispatch::__destruct():

一旦你执行了 dispatch(new JobClass())->onQueue("default"),你将让作业实例准备好进行调度,实际的工作发生在 PendingDispatch::__destruct()中:

public function __destruct()
{
    app(Dispatcher::class)->dispatch($this->job);
}

This method, when called, will resolve an instance of Dispatcher from the container and call the dispatch() method on it. A __destruct() is a PHP magic method that"s called when all references to the object no longer exist or when the script terminates, and since we don"t store a reference to the PendingDispatch instance anywhere the __destruct method will be called immediately:

调用此方法时,将从容器中解析 Dispatcher 的一个实例,然后调用它的dispatch()方法。 destruct()是一种PHP魔术方法,当对对象的所有引用不再存在或脚本终止时,都会调用,因为我们不会立即在 __destruct方法中存储对PendingDispatch 实例的引用,

// Here the destructor will be called rightaway
dispatch(new JobClass())->onQueue("default");

// 如果我们调用unset($temporaryVariable),那么析构函数将被调用
// 或脚本完成执行时。
$temporaryVariable = dispatch(new JobClass())->onQueue("default");
Using the Dispatchable trait 使用可调度的特征

You can use the BusDispatchable trait on your job class to be able to dispatch your jobs like this:

您可以使用工作类上的 BusDispatchable trait来调度您的工作,如下所示:

(new InvoiceEmail($order))->dispatch();

Here"s how the dispatch method of Dispatchable looks like:

调度方法 Dispatchable看起来像这样:

public static function dispatch()
{
    return new PendingDispatch(new static(...func_get_args()));
}

As you can see it uses an instance of PendingDispatch, that means we can do something like:

正如你可以看到它使用一个 PendingDispatch的实例,这意味着我们可以做一些像这样的事:

(new InvoiceEmail($order))->dispatch()->onQueue("emails");

转载请注明: 转载自Ryan是菜鸟 | LNMP技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

本文链接地址: 剖析Laravel队列系统--推送作业到队列

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/23159.html

相关文章

  • 剖析Laravel队列系统--准备队列作业

    摘要:原文链接我们推送到队列的每个作业都存储在按执行顺序排序的某些存储空间中,该存储位置可以是数据库,存储或像这样的第三方服务。这个数字从开始,在每次运行作业时不断增加。 原文链接https://divinglaravel.com/queue-system/preparing-jobs-for-queue Every job we push to queue is stored in som...

    marek 评论0 收藏0
  • 剖析Laravel队列系统--Worker

    摘要:一旦这一切完成,方法会运行在类属性在命令构造后设置容器解析实例,在中我们设置了将使用的缓存驱动,我们也根据命令来决定我们调用什么方法。作业只在以上起效在上也无效处理作业方法调用触发事件触发事件。 译文GitHub https://github.com/yuansir/diving-laravel-zh 原文链接https://divinglaravel.com/queue-system...

    CollinPeng 评论0 收藏0
  • 剖析Laravel队列系统--初探

    摘要:配有内置的队列系统,可帮助您在后台运行任务,并通过简单的来配置系统在不同情况下起作用。您可以在中管理队列配置,默认情况下它有使用不同队列驱动的几个连接,您可以看到项目中可以有多个队列连接,也可以使用多个队列驱动程序。 原文链接https://divinglaravel.com/queue-system/before-the-dive Laravel receives a request...

    pubdreamcc 评论0 收藏0
  • 剖析 Laravel 计划任务--事件属性

    摘要:所以在这里创建一个事件的两个实际方法是通过调用或,第一个提交一个的实例,后者提交来做一些特殊处理。那么会用表达式检查命令是否到期吗恰恰相反,使用库来确定命令是否基于当前系统时间相对于我们设置的时区。 译文GitHub https://github.com/yuansir/diving-laravel-zh 原文链接 https://divinglaravel.com/task-sche...

    xiaowugui666 评论0 收藏0
  • laravel 队列

    摘要:如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。这两个进程操纵了三个队列,其中一个,负责即时任务,两个,负责延时任务与待处理任务。如果任务执行成功,就会删除中的任务,否则会被重新放入队列中。 在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试...

    BDEEFE 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<