资讯专栏INFORMATION COLUMN

剖析Laravel队列系统--Worker

CollinPeng / 784人阅读

摘要:一旦这一切完成,方法会运行在类属性在命令构造后设置容器解析实例,在中我们设置了将使用的缓存驱动,我们也根据命令来决定我们调用什么方法。作业只在以上起效在上也无效处理作业方法调用触发事件触发事件。

译文GitHub https://github.com/yuansir/diving-laravel-zh

原文链接https://divinglaravel.com/queue-system/workers

现在,我们知道了Laravel如何将作业推到不同的队列中,让我们来深入了解workers如何运作你的作业。 首先,我将workers定义为一个在后台运行的简单PHP进程,目的是从存储空间中提取作业并针对多个配置选项运行它们。

php artisan queue:work

运行此命令将指示Laravel创建应用程序的一个实例并开始执行作业,这个实例将一直存活着,启动Laravel应用程序的操作只在运行命令时发生一次,同一个实例将被用于执行你的作业,这意味着:

避免在每个作业上启动整个应用程序来节省服务器资源。

在应用程序中所做的任何代码更改后必须手动重启worker。

你也可以这样运行:

php artisan queue:work --once

这将启动应用程序的一个实例,处理单个作业,然后干掉脚本。

php artisan queue:listen

queue:listen 命令相当于无限循环地运行 queue:work --once 命令,这将导致以下问题:

每个循环都会启动一个应用程序实例。

分配的worker将选择一个工作并执行。

worker进程将被干掉。

使用 queue:listen 确保为每个作业创建一个新的应用程序实例,这意味着代码更改以后不必手动重启worker,同时也意味着将消耗更多的服务器资源。

queue:work 命令

我们来看看 QueueConsoleWorkCommand 类的 handle() 方法,这是当你运行 php artisan queue:work 时会执行的方法:

public function handle()
{
    if ($this->downForMaintenance() && $this->option("once")) {
        return $this->worker->sleep($this->option("sleep"));
    }

    $this->listenForEvents();

    $connection = $this->argument("connection")
                    ?: $this->laravel["config"]["queue.default"];

    $queue = $this->getQueue($connection);

    $this->runWorker(
        $connection, $queue
    );
}

首先,我们检查应用程序是否处于维护模式,并使用 --once 选项,在这种情况下,我们希望脚本正常运行,因此我们不执行任何作业,我们只需要在完全杀死脚本前让worker在一段时间内休眠。

QueueWorkersleep() 方法看起来像这样:

public function sleep($seconds)
{
    sleep($seconds);
}
为什么我们不能在 handle() 方法中返回null来终止脚本?

如前所述, queue:listen 命令在循环中运行 WorkCommand

while (true) {
     // This process simply calls "php artisan queue:work --once"
    $this->runProcess($process, $options->memory);
}

如果应用程序处于维护模式,并且 WorkCommand 立即终止,这将导致循环结束,下一个在很短的时间内启动,最好在这种情况下导致一些延迟,而不是通过创建我们不会真正使用的大量应用程序实例。

监听事件

handle() 方法里面我们调用 listenForEvents() 方法:

protected function listenForEvents()
{
    $this->laravel["events"]->listen(JobProcessing::class, function ($event) {
        $this->writeOutput($event->job, "starting");
    });

    $this->laravel["events"]->listen(JobProcessed::class, function ($event) {
        $this->writeOutput($event->job, "success");
    });

    $this->laravel["events"]->listen(JobFailed::class, function ($event) {
        $this->writeOutput($event->job, "failed");

        $this->logFailedJob($event);
    });
}

在这个方法中我们会监听几个事件,这样我们可以在每次作业处理中,处理完或处理失败时向用户打印一些信息。

记录失败作业

一旦作业失败 logFailedJob() 方法会被调用

$this->laravel["queue.failer"]->log(
    $event->connectionName, $event->job->getQueue(),
    $event->job->getRawBody(), $event->exception
);

queue.failer 容器别名在 QueueQueueServiceProvider::registerFailedJobServices() 中注册:

protected function registerFailedJobServices()
{
    $this->app->singleton("queue.failer", function () {
        $config = $this->app["config"]["queue.failed"];

        return isset($config["table"])
                    ? $this->databaseFailedJobProvider($config)
                    : new NullFailedJobProvider;
    });
}

/**
 * Create a new database failed job provider.
 *
 * @param  array  $config
 * @return IlluminateQueueFailedDatabaseFailedJobProvider
 */
protected function databaseFailedJobProvider($config)
{
    return new DatabaseFailedJobProvider(
        $this->app["db"], $config["database"], $config["table"]
    );
}

如果配置了 queue.failed ,则将使用数据库队列失败,并将有关失败作业的信息简单地存储在数据库表中的:

$this->getTable()->insertGetId(compact(
    "connection", "queue", "payload", "exception", "failed_at"
));
运行worker

要运行worker,我们需要收集两条信息:

worker的连接信息从作业中提取

worker找到作业的队列

如果没有使用 queue.default 配置定义的默认连接。您可以为 queue:work 命令提供 --connection=default 选项。

队列也是一样,您可以提供一个 --queue=emails 选项,或选择连接配置中的 queue 选项。一旦这一切完成, WorkCommand::handle() 方法会运行 runWorker()

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel["cache"]->driver());

    return $this->worker->{$this->option("once") ? "runNextJob" : "daemon"}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

在worker类属性在命令构造后设置:

public function __construct(Worker $worker)
{
    parent::__construct();

    $this->worker = $worker;
}

容器解析 QueueWorker 实例,在runWorker()中我们设置了worker将使用的缓存驱动,我们也根据--once 命令来决定我们调用什么方法。

如果使用 --once 选项,我们只需调用 runNextJob 来运行下一个可用的作业,然后脚本就会终止。 否则,我们将调用 daemon 方法来始终保持进程处理作业。

在开始工作时,我们使用 gatherWorkerOptions() 方法收集用户给出的命令选项,我们稍后会提供这些选项,这个工具是 runNextJobdaemon 方法。

protected function gatherWorkerOptions()
{
    return new WorkerOptions(
        $this->option("delay"), $this->option("memory"),
        $this->option("timeout"), $this->option("sleep"),
        $this->option("tries"), $this->option("force")
    );
}
守护进程

让我看看 Worker::daemon() 方法,这个方法的第一行调用了 Worker::daemon() 方法

protected function listenForSignals()
{
    if ($this->supportsAsyncSignals()) {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
}

这种方法使用PHP7.1的信号处理, supportsAsyncSignals() 方法检查我们是否在PHP7.1上,并加载 pcntl 扩展名。

之后pcntl_async_signals() 被调用来启用信号处理,然后我们为多个信号注册处理程序:

当脚本被指示关闭时,会引发SIGTERM

SIGUSR2是用户定义的信号,Laravel用来表示脚本应该暂停。

当暂停的脚本继续进行时,会引发SIGCONT

这些信号从Process Monitor(如 Supervisor )发送并与我们的脚本进行通信。

Worker::daemon() 方法中的第二行读取最后一个队列重新启动的时间戳,当我们调用queue:restart 时该值存储在缓存中,稍后我们将检查是否和上次重新启动的时间戳不符合,来指示worker在之后多次重启。

最后,该方法启动一个循环,在这个循环中,我们将完成其余获取作业的worker,运行它们,并对worker进程执行多个操作。

while (true) {
    if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
        $this->pauseWorker($options, $lastRestart);

        continue;
    }

    $job = $this->getNextJob(
        $this->manager->connection($connectionName), $queue
    );

    $this->registerTimeoutHandler($job, $options);

    if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        $this->sleep($options->sleep);
    }

    $this->stopIfNecessary($options, $lastRestart);
}
确定worker是否应该处理作业

调用 daemonShouldRun() 检查以下情况:

应用程序不处于维护模式

Worker没有暂停

没有事件监听器阻止循环继续

如果应用程序在维护模式下,worker使用--force选项仍然可以处理作业:

php artisan queue:work --force

确定worker是否应该继续的条件之一是:

$this->events->until(new EventsLooping($connectionName, $queue)) === false)

这行触发 QueueEventLooping 事件,并检查是否有任何监听器在 handle() 方法中返回false,这种情况下你可以强制您的workers暂时停止处理作业。

如果worker应该暂停,则调用 pauseWorker() 方法:

protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);
}

sleep 方法并传递给控制台命令的 --sleep 选项,这个方法调用

public function sleep($seconds)
{
    sleep($seconds);
}

脚本休眠了一段时间后,我们检查worker是否应该在这种情况下退出并杀死脚本,稍后我们看一下stopIfNecessary 方法,以防脚本不能被杀死,我们只需调用 continue; 开始一个新的循环:

if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

    continue;
}
Retrieving 要运行的作业
$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

getNextJob() 方法接受一个队列连接的实例,我们从队列中获取作业

protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(",", $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    }
}

我们简单地循环给定的队列,使用选择的队列连接从存储空间(数据库,redis,sqs,...)获取作业并返回该作业。

要从存储中retrieve作业,我们查询满足以下条件的最旧作业:

推送到 queue ,我们试图从中找到作业

没有被其他worker reserved

可以在给定的时间内运行,有些作业在将来被推迟运行

我们也取到了很久以来被冻结的作业并重试

一旦我们找到符合这一标准的作业,我们将这个作业标记为reserved,以便其他workers获取到,我们还会增加作业监控次数。

监控作业超时

下一个作业被retrieved之后,我们调用 registerTimeoutHandler() 方法:

protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });the

        $timeout = $this->timeoutForJob($job, $options);

        pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
    }
}

再次,如果 pcntl 扩展被加载,我们将注册一个信号处理程序干掉worker进程如果该作业超时的话,在配置了超时之后我们使用 pcntl_alarm() 来发送一个 SIGALRM 信号。

如果作业所花费的时间超过了超时值,处理程序将会终止该脚本,如果不是该作业将通过,并且下一个循环将设置一个新的报警覆盖第一个报警,因为进程中可能存在单个报警。

作业只在PHP7.1以上起效,在window上也无效 ¯_(ツ)_/¯

处理作业

runJob() 方法调用 process():

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    }
}

raiseBeforeJobEvent() 触发 QueueEventsJobProcessing 事件, raiseAfterJobEvent() 触发 QueueEventsJobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 检查进程是否达到最大尝试次数,并将该作业标记为失败:

protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($maxTries === 0 || $job->attempts() <= $maxTries) {
        return;
    }

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        "A queued job has been attempted too many times. The job may have previously timed out."
    ));

    throw $e;
}

否则我们在作业对象上调用 fire() 方法来运行作业。

从哪里获取作业对象

getNextJob() 方法返回一个 ContractsQueueJob 的实例,这取决于我们使用相应的Job实例的队列驱动程序,例如如果数据库队列驱动则选择 QueueJobsDatabaseJob

循环结束

在循环结束时,我们调用 stopIfNecessary() 来检查在下一个循环开始之前是否应该停止进程:

protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
    if ($this->shouldQuit) {
        $this->kill();
    }

    if ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    }
}

shouldQuit 属性在两种情况下设置,首先listenForSignals() 内部的作为 SIGTERM 信号处理程序,其次在 stopWorkerIfLostConnection()

protected function stopWorkerIfLostConnection($e)
{
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;
    }
}

在retrieving和处理作业时,会在几个try ... catch语句中调用此方法,以确保worker应该处于被干掉的状态,以便我们的Process Control可能会启动一个新的数据库连接。

causedByLostConnection() 方法可以在 DatabaseDetectsLostConnections trait中找到。
memoryExceeded() 检查内存使用情况是否超过当前设置的内存限制,您可以使用 --memory 选项设置限制。

转载请注明: 转载自Ryan是菜鸟 | LNMP技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

本文链接地址: 剖析Laravel队列系统--Worker

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/23262.html

相关文章

  • 剖析Laravel队列系统--准备队列作业

    摘要:原文链接我们推送到队列的每个作业都存储在按执行顺序排序的某些存储空间中,该存储位置可以是数据库,存储或像这样的第三方服务。这个数字从开始,在每次运行作业时不断增加。 原文链接https://divinglaravel.com/queue-system/preparing-jobs-for-queue Every job we push to queue is stored in som...

    marek 评论0 收藏0
  • 剖析Laravel队列系统--推送作业到队列

    摘要:有几种有用的方法可以使用将作业推送到特定的队列在给定的秒数之后推送作业延迟后将作业推送到特定的队列推送多个作业推送特定队列上的多个作业调用这些方法之后,所选择的队列驱动会将给定的信息存储在存储空间中,供按需获取。 原文链接https://divinglaravel.com/queue-system/pushing-jobs-to-queue There are several ways...

    maochunguang 评论0 收藏0
  • 剖析Laravel队列系统--初探

    摘要:配有内置的队列系统,可帮助您在后台运行任务,并通过简单的来配置系统在不同情况下起作用。您可以在中管理队列配置,默认情况下它有使用不同队列驱动的几个连接,您可以看到项目中可以有多个队列连接,也可以使用多个队列驱动程序。 原文链接https://divinglaravel.com/queue-system/before-the-dive Laravel receives a request...

    pubdreamcc 评论0 收藏0
  • Swoft 源码剖析 - 连接池

    摘要:基于扩展实现真正的数据库连接池这种方案中,项目占用的连接数仅仅为。一种是连接暂时不再使用,其占用状态解除,可以从使用者手中交回到空闲队列中这种我们称为连接的归队。源码剖析系列目录 作者:bromine链接:https://www.jianshu.com/p/1a7...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft Github: https:...

    rozbo 评论0 收藏0
  • Laravel5.2队列驱动expire参数设置带来的重复执行问题 数据库驱动

    摘要:已经取消了参数,都用来执行。取数据的过程事物处理已经打开。取得符合条件的队列后程序会更新该条数据,并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...

    ysl_unh 评论0 收藏0

发表评论

0条评论

CollinPeng

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<