0

私は、Mongo Change Streamsを使用して変更イベントをリッスンし、elasticsearch の変更をほぼリアルタイムでインデックス化するCDCベースのアプリケーションを構築しています。

これまでのところ、関数を呼び出してイベントをキャプチャし、それらを変換し、1 つの mongo コレクションのストリームを実装するときに問題なく Elasticsearch にインデックスを付けるワーカーを実装しました。

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

無限ループを使用して実装しました (おそらく悪いアプローチです) が、変更ストリームを永久に存続させなければならない場合に、どのような代替手段があるかわかりません。

別のモデルの変更ストリームを実装する必要があるときに問題が発生します。最初の関数にはブロックしている while ループがあるため、ワーカーは 2 番目の関数を呼び出して 2 番目の変更ストリームを開始できません。

x no をトリガーできるワーカーをスピンアップする最善の方法は何だろうと思っています。各変更ストリームのパフォーマンスに影響を与えることなく、変更ストリームの ワーカースレッドは正しい方法でしょうか?

4

1 に答える 1