0

同じ IP で異なるポートで実行されている 3 つの beanstalkd キュー プロセスがあります。キューを処理するために、並行して (beanstalkd ポートごとに 20) PHP ワーカーを生成するスーパーバイザーを実行する別のサーバーがあります。私の問題は、2 つのプロセスが同じサーバーで同じジョブ ID を同時に予約できるように見えることです。

私のログからの出力例を次に示します。

2017-02-23 09:59:56 --> START JOB (port: 11301 | u: 0.45138600 1487861996 | jid:1695074 | pid:30019 | j:leads_to_tags_add | tr:1)
2017-02-23 09:59:57 --> START JOB (port: 11301 | u: 0.55024800 1487861997 | jid:1695074 | pid:30157 | j:leads_to_tags_add | tr:2)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.54731000 1487861998 | jid:1695074 | pid:30019 | j:leads_to_tags_add)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.58927900 1487861998 | jid:1695074 | pid:30157 | j:leads_to_tags_add)

2 つの予約が次々に行われ、最初のプロセスが終了してジョブが削除される前に 2 番目の予約が行われるようです。

jobid ごとに redis にカウンターを追加しましたが、2 回目に予約するまでに、カウンターが 1 回上昇したことは明らかです (tr)。TTRR は 3600 に設定されているため、最初のプロセスが完了する前に期限切れになることはありません。

これは、2 番目のプロセス予約の直後のジョブ ステータスです。

Pheanstalk\Response\ArrayResponse::__set_state(array(
   'id' => '1695074',
   'tube' => 'action-medium',
   'state' => 'reserved',
   'pri' => '0',
   'age' => '1',
   'delay' => '0',
   'ttr' => '3600',
   'time-left' => '3599',
   'file' => '385',
   'reserves' => '2',
   'timeouts' => '0',
   'releases' => '0',
   'buries' => '0',
   'kicks' => '0',
))

この動作は非常にランダムで、ジョブがロックされるまで 1 つのプロセスしか予約できない場合もあれば、2 つ、場合によっては 4 つ以上 (まれに) しか予約できないこともあります。もちろん、これにより、実行される重複ジョブの数に一貫性がなくなります。

コードの短いバージョン:

$this->job = $this->pheanstalk->watch($tube)->reserve($timeout);


set_error_handler(function($errno, $errstr, $errfile, $errline, array $errcontext) {
    // error was suppressed with the @-operator
    if (0 === error_reporting()) {
        return false;
    }

    throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
});

$this->log_message('info', __METHOD__ . ": START JOB (" . $this->_logDetails() . " | tr:{$tries})");


if ($this->_process_job()) {
    $this->log_message('info', __METHOD__ . ": FINISHED JOB (" . $this->_logDetails() . ")");
    $this->_delete_job();
} else {
    $this->log_message('error', __METHOD__ . ": FAILED JOB (" . $this->_logDetails() . ")");
}

restore_error_handler();

protected function _delete_job()
{
    $this->pheanstalk->delete($this->job);
    $this->log_message('info', __METHOD__ . ": DELETED JOB (" . $this->_logDetails() . ")");
}
4

1 に答える 1