私は、Mongo Change Streamsを使用して変更イベントをリッスンし、elasticsearch の変更をほぼリアルタイムでインデックス化するCDCベースのアプリケーションを構築しています。
これまでのところ、関数を呼び出してイベントをキャプチャし、それらを変換し、1 つの mongo コレクションのストリームを実装するときに問題なく Elasticsearch にインデックスを付けるワーカーを実装しました。
function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
無限ループを使用して実装しました (おそらく悪いアプローチです) が、変更ストリームを永久に存続させなければならない場合に、どのような代替手段があるかわかりません。
別のモデルの変更ストリームを実装する必要があるときに問題が発生します。最初の関数にはブロックしている while ループがあるため、ワーカーは 2 番目の関数を呼び出して 2 番目の変更ストリームを開始できません。
x no をトリガーできるワーカーをスピンアップする最善の方法は何だろうと思っています。各変更ストリームのパフォーマンスに影響を与えることなく、変更ストリームの ワーカースレッドは正しい方法でしょうか?