ソースの 1 つに障害が発生した場合に高可用性を確保できるように、2 つの Kafka トピックが異なるソースからまったく同じコンテンツをストリーミングしています。Kafka Streams 0.10.1.0 を使用して 2 つのトピックを 1 つの出力トピックにマージしようとしています。これにより、失敗時にメッセージを見逃さず、すべてのソースが稼働しているときに重複がなくなります。
KStreamのメソッドを使用するleftJoin
と、トピックの 1 つ (セカンダリ トピック) は問題なくダウンできますが、プライマリ トピックがダウンすると、出力トピックには何も送信されません。これは、Kafka Streams 開発者ガイドによると、
KStream-KStream leftJoin は、常にプライマリ ストリームから到着するレコードによって駆動されます。
そのため、プライマリ ストリームからのレコードがない場合は、セカンダリ ストリームのレコードが存在しても使用しません。プライマリ ストリームがオンラインに戻ると、出力は正常に再開されます。
また、(重複レコードを追加する)を使用してouterJoin
から、重複を取り除くために KTable および groupByKey への変換を試みました。
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
しかし、私はまだ時々重複を取得します。commit.interval.ms=200
また、KTable を取得して出力ストリームに十分な頻度で送信するためにも使用しています。
複数の同一の入力トピックから正確に 1 回の出力を取得するために、このマージにアプローチする最良の方法は何でしょうか?