7

私が使うorg.apache.kafka:kafka-streams:0.10.0.1

KStream.Process()トリガーする(「句読点」) をトリガーしていないように見える時系列ベースのストリームを操作しようとしています。(参考までにこちらをご覧ください)

構成ではKafkaStreams、このパラメーターを渡しています(特に):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

ここでは、 JSON データからタイムスタンプ情報を抽出EventTimeExtractorするカスタム タイムスタンプ エクストラクタ ( を実装) を示します。org.apache.kafka.streams.processor.TimestampExtractor

新しいレコードが取り込まれるたびに、これが私のオブジェクト (から派生TimestampExtractor) を呼び出すことを期待します。問題のストリームは 2 * 10^6 レコード/分です。punctuate()60 秒に設定しましたが、起動しません。古い値を引っ張って追いつくため、データがこのスパンを非常に頻繁に通過することはわかっています。

実際、まったく呼び出されません。

  • これは、KStream レコードにタイムスタンプを設定するための間違ったアプローチですか?
  • これは、この構成を宣言する間違った方法ですか?
4

3 に答える 3

8

2017 年 11 月更新: Kafka 1.0 の Kafka ストリームはpunctuate()、ストリーム時間と処理時間 (壁時計時間) の両方の動作をサポートするようになりました。したがって、好みの動作を選択できます。

あなたの設定は私には正しいようです。

注意すべきこと: Kafka 0.10.0 の時点で、punctuate()メソッドはストリーム時間で動作します(デフォルトでは、つまり、デフォルトのタイムスタンプ エクストラクタに基づいて、ストリーム時間はイベント時間を意味します)。また、ストリーム時間は、新しいデータ レコードが入ってくるときにのみ進められ、ストリーム時間の進み方は、これらの新しいレコードに関連付けられたタイムスタンプによって決まります。

例えば:

  • punctuate()1 分ごとに呼び出されるように設定したとします= 60 * 1000(注: stream-timeの 1 分)。ここで、次の 5 分間データが受信されない場合、punctuate()はまったく呼び出されません。5 回呼び出されると予想される場合でも。なんで?繰り返しpunctuate()ますが、ストリーム時間に依存するため、ストリーム時間は新しく受信したデータ レコードに基づいてのみ進められます。

これがあなたが見ている動作を引き起こしている可能性がありますか?

今後の見通し: Kafka プロジェクトでは、より柔軟にする方法について、既に議論が進行punctuate()中です。stream-timeevent-timeprocessing-time

于 2016-09-16T19:37:29.843 に答える
2

あなたのアプローチは正しいようです。http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parametersの段落「Timestamp Extractor (timestamp.extractor):」を比較してください

カスタム タイムスタンプ エクストラクタが使用されない理由がわからない。をご覧くださいorg.apache.kafka.streams.processor.internals.StreamTask。コンストラクターには次のようなものがあるはずです

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

カスタムエクストラクターがそこにピックアップされているかどうかを確認してください...

于 2016-09-16T18:37:40.120 に答える