队列工作人员意外冻结,需要重新启动工作工作

问题描述 投票:0回答:1

我有 3 个 beanstalkd 队列,每个队列都有自己的工作人员。工作人员是用 php 编写的,我没有使用任何框架。为了使用队列,我使用 pheanstalk 来处理队列。

由于某种原因,工作人员突然停止工作,没有任何错误,并且不处理队列中的作业。一旦我重新启动工人,他们就会在接下来的几天里开始工作,并重复相同的循环。有人可以帮我理解可能是什么问题吗?

我的工人

<?php

ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
error_reporting(E_ALL);

require 'vendor/autoload.php';

use Pheanstalk\Pheanstalk;


function function_name()
{
  $log_file = './log/error.log';
  $tube_name = 'current_tube_name';
  $memoryLimit = 128;
  $dotenv = Dotenv\Dotenv::createImmutable(__DIR__);
  $dotenv->load();
  $beanstalkd_host = $_ENV['BEANSTALKD_HOST'];
  $beanstalkd_port = $_ENV['BEANSTALKD_PORT'];

  while (true) {
    try {
        $pheanstalk = Pheanstalk::create($beanstalkd_host);
        $pheanstalk->watch($tube_name)->ignore('default');
    }
    catch(\Exception $exception) {
        $error_message = "CRITICAL|" . __FILE__ . "|" . __LINE__ . "|" . "Failed to initiate queue worker; could not connect to beanstalkd.Error {$exception->getMessage()}";
        error_log($error_message, 3, $log_file);
        exit;
    }

    $job = $pheanstalk->reserve(10);
    if (!$job) {
        sleep(5);
        continue; //move on to next iteration
    }

    $job_in_queue = $job->getData();//outputting the message
    $job_pay_load = json_decode($job_in_queue, true);
    if(is_array($job_pay_load) == false) {
        $job_pay_load = json_decode(unserialize($job_in_queue), true);
    }

    if (is_null($job_pay_load) || !is_array($job_pay_load) || count($job_pay_load) === 0 || !is_countable($job_pay_load)) {
        $error_message = "CRITICAL|" . __FILE__ . "|" . __LINE__ . "|" . "Job payload structure is not correct. Data: " . print_r($jobPayload, true);
        error_log($error_message, 3, $log_file);
        $pheanstalk->bury($job);
        $job = null;
        $job_pay_load = null;
        continue;
    }

    //do the actual process

    $payload = array(
        'key1' => $val1,
        'key2' => $val2
    );

    $next_tube_name = 'second_tube';
    $json_payload = json_encode($payload);
    $pheanstalk
        ->useTube($next_tube_name)
        ->put($json_payload);

    $pheanstalk->delete($job);

    $job = null;
    $json_payload = null;
    $job_pay_load = [];
    $job_in_queue = null;

    gc_collect_cycles();

    if((memory_get_usage() / 1024 / 1024) >= $memoryLimit) {
        $error_message = "We ran out of memory, restarting... in file ".__FILE__." on line" . __LINE__;
        error_log($error_message, 3, $log_file);
        exit;
    }
  }
 }

 function_name();
php queue worker beanstalkd pheanstalk
1个回答
0
投票

除了查看日志并查看脚本本身如何运行或退出后运行之外 - 我确实想到了一件事。

您确实进行了内存使用检查,但 memory_get_usage 函数并不能说明全部情况 - 有一个可选参数

bool $real_usage
可以设置为 true。这将获取已分配的总金额 - 与 PHP 当前使用的金额相反。

由于您正在检查此代码的内存使用量> 128MB,我认为您还设置了默认的 PHP 设置

memory_limit
也设置为
128M
- 但如果脚本使用的内存量已经是超过该金额,它可能“已经”失败并退出。根据您实际执行的工作量,我会将 $memoryLimit = 128; 减少到工作量的 3/4 - 不超过
92
(兆字节)。 输入最大作业数来确定在重新启动之前允许多少个循环,这将是避免任何潜在内存泄漏导致问题的另一种方法。

另一个潜在原因是无法处理守护进程本身的连接问题 - 如果

reserve

或通过 Pheanstalk 库对 Beanstalkd 的其他调用失败。

最终,如果您在不想实际停止时故意或意外退出,则必须确保脚本自动重新启动。如何执行此操作取决于您首先如何启动脚本运行 - 以及您正在运行的操作系统的版本(因为这决定了可用的工具,例如 

systemd

单元脚本)。

对(几乎)每个操作和更广泛的系统状态进行额外(临时)记录将有助于缩小可能出现问题的范围。使用的内存、已处理的作业数量以及更广泛的系统的繁忙程度等信息都可以提供有用的信息。

© www.soinside.com 2019 - 2024. All rights reserved.