12

ソースの 1 つに障害が発生した場合に高可用性を確保できるように、2 つの Kafka トピックが異なるソースからまったく同じコンテンツをストリーミングしています。Kafka Streams 0.10.1.0 を使用して 2 つのトピックを 1 つの出力トピックにマージしようとしています。これにより、失敗時にメッセージを見逃さず、すべてのソースが稼働しているときに重複がなくなります。

KStreamのメソッドを使用するleftJoinと、トピックの 1 つ (セカンダリ トピック) は問題なくダウンできますが、プライマリ トピックがダウンすると、出力トピックには何も送信されません。これは、Kafka Streams 開発者ガイドによると、

KStream-KStream leftJoin は、常にプライマリ ストリームから到着するレコードによって駆動されます。

そのため、プライマリ ストリームからのレコードがない場合は、セカンダリ ストリームのレコードが存在しても使用しません。プライマリ ストリームがオンラインに戻ると、出力は正常に再開されます。

また、(重複レコードを追加する)を使用してouterJoinから、重複を取り除くために KTable および groupByKey への変換を試みました。

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

しかし、私はまだ時々重複を取得します。commit.interval.ms=200また、KTable を取得して出力ストリームに十分な頻度で送信するためにも使用しています。

複数の同一の入力トピックから正確に 1 回の出力を取得するために、このマージにアプローチする最良の方法は何でしょうか?

4

1 に答える 1

8

どのような種類の結合を使用しても問題は解決しません。常に結果が失われるか (一部のストリームが停止した場合は内部結合)、またはnull両方のストリームがオンラインである場合は左結合または外部結合で「重複」することになるためです。 )。Kafka ストリームの結合セマンティクスの詳細については、https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semanticsを参照してください。

KStream process()したがって、 、transform()、またはを使用して DSL と組み合わせて使用​​できるプロセッサ API を使用することをお勧めしますtransformValues()。詳細については、Kafka Stream DSL を使用してプロセッサでキーと値をフィルタリングする方法を参照してください。

プロセッサにカスタム ストアを追加することもできます (カスタム StateStore を Kafka Streams DSL プロセッサに追加する方法は? )。

于 2016-11-28T17:40:19.230 に答える