2

ログイベントの処理に Kafka を使用しています。単純なコネクタとストリーム変換に関する Kafka Connect と Kafka Streams の基本的な知識があります。

これで、次の構造のログ ファイルが作成されました。

timestamp event_id event

ログ イベントには、event_id で接続された複数のログ行があります (メール ログなど)。

例:

1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END

一般に、複数のイベントがあります。

例:

1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END

時間枠 (START と END の間) は最大 5 分です。

結果として、次のようなトピックが必要です

event_id combined_log

例:

1 START,INFO1,INFO2,END
2 START,INFO2,END

これを達成するための適切なツールは何ですか? Kafka Streams で解決しようとしましたが、方法を理解できます..

4

1 に答える 1

2

あなたのユースケースでは、基本的にメッセージペイロードに基づいてセッションまたはトランザクションを再構築しています。現時点では、そのような機能に対する組み込みのすぐに使用できるサポートはありません。ただし、Kafka の Streams API の Processor API 部分を使用して、この機能を自分で実装できます。状態ストアを使用して、特定のキーについて、セッション/トランザクションがいつ開始され、追加され、終了されるかを追跡するカスタム プロセッサを作成できます。

メーリング リストの何人かのユーザーはその IIRC を行っていますが、私が指摘できる既存のコード例は知りません。

注意する必要があるのは、順不同のデータを適切に処理することです。上記の例では、すべての入力データを適切な順序でリストしました。

1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END

ただし、実際には、メッセージ/レコードは次のように順不同で到着する場合があります (1例を簡単にするために、キー付きのメッセージのみを示します)。

1234 1 START
1237 1 END
1236 1 INFO2
1235 1 INFO1

それが起こったとしても、あなたのユースケースでは、このデータを(無視/ドロップおよび=データ損失)または(間違った順序、おそらくセマンティック制約にも違反している)START -> INFO1 -> INFO2 -> ENDではなく、このデータを解釈したいことを理解しています。START -> ENDINFO1INFO2START -> END -> INFO2 -> INFO1

于 2016-11-09T08:47:56.407 に答える