5

Symfony2 とRabbitMqBundleを使用して、ElasticSearchにドキュメントを送信するワーカーを作成しました。ドキュメントを 1 つずつインデックス化するのは、ElasticSearch バルク API を使用するよりもはるかに遅くなります。したがって、文書を数千のグループで ES にフラッシュするバッファーを作成しました。コードは次のようになります (少し単純化されています)。

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;
    }
}

これはすべて非常にうまく機能しますが、わずかな問題があります。キューは予測できない速度でメッセージでいっぱいになります。5 分間で 100000 の場合もあれば、数時間で 1 でない場合もあります。たとえば、82671 個のドキュメントがキューに入れられている場合、最後の 671 個のドキュメントは、数時間かかる可能性がある別の 329 個のドキュメントを受信する前にインデックス化されません。私は次のことができるようにしたいと思います:

警告: SF コード! これは明らかにうまくいきません:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;
    protected $flushTimer;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;

        // Highly Sci-fi code
        $this->flushTimer = new Timer();
        // Flush buffer after 5 minutes of inactivity.
        $this->flushTimer->setTimeout(5 * 60);
        $this->flushTimer->setCallback([$this, 'flush']);
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        } else {
            // Start a timer that will flush the buffer after a timeout.
            $this->initTimer();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;

        // There are no longer messages to be send, stop the timer.
        $this->flushTimer->stop();
    }

    protected function initTimer()
    {
        // Start or restart timer
        $this->flushTimer->isRunning()
          ? $this->flushTimer->reset()
          : $this->flushTimer->start();
    }
}

これで、イベント駆動型ではない PHP の制限について理解できました。しかし、これは 2015 年で、ReactPHP のようなソリューションがあるので、これは可能なはずですよね? ØMQ には、この関数があります。RabbitMQ で機能する、またはメッセージ キュー拡張機能とは無関係に機能するソリューションは何でしょうか?

私が懐疑的な解決策:

  1. crysalead/codeがあります。を使用してタイマーをシミュレートしdeclare(ticks = 1);ます。これがパフォーマンスと堅実なアプローチであるかどうかはわかりません。何か案は?
  2. 「FLUSH」メッセージを 5 分ごとに同じキューに発行する cronjob を実行し、このメッセージを受信したときに明示的にバッファをフラッシュすることはできますが、それはごまかしになります。
4

1 に答える 1

0

コメントで述べたように、シグナルを使用できます。PHP では、シグナル ハンドラーをスクリプト シグナル (SIGINT、SIGKILL など) に登録できます。

ユースケースでは、SIGALRM シグナルを使用できます。このシグナルは、特定の時間 (設定可能) が経過した後にスクリプトに警告を発します。これらのシグナルの良い面は、ノンブロッキングであることです。つまり、スクリプトの通常の操作が妨げられることはありません。

調整されたソリューション (目盛りは PHP 5.3 以降非推奨です):

function signal_handler($signal) {
    // You would flush here
    print "Caught SIGALRM\n";
    // Set the SIGALRM timer again or it won't trigger again
    pcntl_alarm(300);
}

// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);

// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
    //Execute your code here
}

注: スクリプトで使用できる SIGALRM シグナルは 1 つだけですpcntl_alarm。アラームのタイマーでシグナルの時間を設定すると、(シグナルを発火せずに) 新しく設定された値にリセットされます。

于 2015-12-16T17:58:57.307 に答える