バックグラウンド:
データベースを更新するパイプラインに沿った複数のソース、複数のシンク、および複数のオペレーターで構成される Flink パイプラインがあります。
質問のために、そして簡単にするために、次のようなパイプラインがあると仮定しましょう。
Source -> KeyBy -> FlatMap -> Filter -> Sink
このパイプラインにより、一部のデータの変更に関する通知をリッスンできるはずです。(各通知には ID が含まれます) 通知ごとに、DB からデータを読み取り、アルゴリズムを実行して、同じ DB 行を更新します。その後、データの変化の大きさも出力します。データ変更の大きさが十分に大きい場合にのみ、別の Kafka トピックに通知を送信します。
- Source は Kafka トピックにサブスクライブして、変更されたデータ ID に関する通知をリッスンします。
- KeyBy は ID によってキーイングされ、同じ ID がオペレーターの 2 つのインスタンスによって同時に処理されないようにします。
- ID を指定すると、FlatMap は DB からデータを読み取り、アルゴリズムを実行して、同じ DB 行を更新します。変化の大きさを発します。これは Map ではなく FlatMap です。これは、特定のエラーが発生した場合など、変更の大きさを発行したくない場合があるためです。
- Filter は、ある閾値よりも小さいマグニチュードでストリームをフィルタリングします
- シンクは、フィルタリングされた通知を別の Kafka トピックに送信しています。
質問:
1 回限りのセマンティクスでパイプラインを実行したいと考えています。私たちが見たところ、Flink は、Kafka ソース、Kafka シンク、および中間のステートフルまたはステートレス オペレーターに対して、1 回限りのセマンティクスをサポートしています。パイプラインに沿って更新するリソースを 1 回だけ実行する方法を説明している場所が見つかりませんでした。1 回限りのセマンティクスを許可するシンク関数を作成できるTwoPhaseCommitSinkFunctionがあります。
データベースを更新した後、Kafka に変更通知を発行するため、使用できません。2 つの別々のシンクで実行すると、DB が実際に更新される前にマグニチュード通知を受信できる競合状態が発生します。
何か不足していますか?Map/FlatMap オペレーターで 2 フェーズ コミットを実装する方法はありますか? 別の解決策はありますか?
ありがとう!