メッセージを消費するためにkafkaスパウトを使用しています。しかし、トポロジを変更してアップロードする必要がある場合、古いメッセージから再開するか、新しいメッセージから開始しますか? Kafka スパウトは、消費する場所からのタイムスタンプを指定しますが、タイムスタンプを知るにはどうすればよいでしょうか?
3 に答える
spoutConfig.forceStartOffsetTime(-1);
そのタイムスタンプの前後に書き込まれた最新のオフセットを選択して、消費を開始します。-1 を渡すことでスパウトを常に最新のオフセットから開始するように強制でき、-2 を渡すことで最も古いオフセットから開始するように強制できます。
基本的に、一連のイベントは次のようになります。
以下のプロパティを最初から読み取って、トポロジを初めて開始します。
forceFromStart = true startOffsetTime = -2
上記の小道具は、トピックの最初から開始するように強制します。forceFromStart
ストームにstartOffsetTime
プロパティを読み取り、設定されている値を使用して読み取りを開始する場所を決定し、飼育係のオフセットを無視するように指示するため、両方のプロパティを忘れずに持ってください。
これからは、トポロジが実行され、zookeeper がオフセットを維持します。ワーカーが死亡した場合、スーパーバイザーによって開始され、zookeeper のオフセットから読み取りが開始されます。
トポロジを再起動し、シャットダウン前に中断された場所から読み取りたい場合は、以下のプロパティを使用してトポロジを再起動します。
forceFromStart = false
上記のプロパティにより、ストームにstartOffsetTime
値を読み取るのではなく、トポロジをシャットダウンする前に維持されている Zookeeper オフセットを使用するように指示します。
これ以降、トポロジを再起動するたびに、残った場所から読み取られます。
トポロジを再起動し、トピックの先頭/先頭から読みたい場合は、以下のプロパティを使用してトポロジを再起動します。
forceFromStart = true startOffsetTime = -1
上記のプロパティによって、zookeeper オフセットを無視し、トピックの先端である最新のオフセットから開始するように Storm に指示しています。