Symfony2 とRabbitMqBundleを使用して、ElasticSearchにドキュメントを送信するワーカーを作成しました。ドキュメントを 1 つずつインデックス化するのは、ElasticSearch バルク API を使用するよりもはるかに遅くなります。したがって、文書を数千のグループで ES にフラッシュするバッファーを作成しました。コードは次のようになります (少し単純化されています)。
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
これはすべて非常にうまく機能しますが、わずかな問題があります。キューは予測できない速度でメッセージでいっぱいになります。5 分間で 100000 の場合もあれば、数時間で 1 でない場合もあります。たとえば、82671 個のドキュメントがキューに入れられている場合、最後の 671 個のドキュメントは、数時間かかる可能性がある別の 329 個のドキュメントを受信する前にインデックス化されません。私は次のことができるようにしたいと思います:
警告: SF コード! これは明らかにうまくいきません:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
これで、イベント駆動型ではない PHP の制限について理解できました。しかし、これは 2015 年で、ReactPHP のようなソリューションがあるので、これは可能なはずですよね? ØMQ には、この関数があります。RabbitMQ で機能する、またはメッセージ キュー拡張機能とは無関係に機能するソリューションは何でしょうか?
私が懐疑的な解決策:
- crysalead/codeがあります。を使用してタイマーをシミュレートし
declare(ticks = 1);
ます。これがパフォーマンスと堅実なアプローチであるかどうかはわかりません。何か案は? - 「FLUSH」メッセージを 5 分ごとに同じキューに発行する cronjob を実行し、このメッセージを受信したときに明示的にバッファをフラッシュすることはできますが、それはごまかしになります。