1

リンクをたどるのに疲れた

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

SimpleConsumer を使用してメッセージを消費しますが、使用中に次のような突然の動作が見つかりました。

コンシューマは、特定のパーティションからメッセージを消費しています。しかし、問題は、コンシューマーが実行されていて、プロデューサーを使用してメッセージをトピックにプッシュすると、そのパーティションからのメッセージが消費されることです。しかし、コンシューマーが現在実行されておらず、いくつかのメッセージをトピックにプッシュして再度コンシューマーを起動すると、プロデューサーによってプッシュされたメッセージは消費されませんが、今すぐプッシュされるメッセージを消費する準備ができています。未処理のメッセージのみを消費したいので、EarliestTime() の代わりに LatestTime() を使用しています。

例えば

ケース-1

コンシューマーは実行中です:

プロデューサーは M1、M2、M3 メッセージをトピック 1 のパーティション 1 にプッシュしました

結果: コンシューマーは 3 つのメッセージすべてを消費します。

ケース - 2

コンシューマが実行されていません

プロデューサーは、m4、m5、m6 メッセージをトピック 1 のパーティション 1 にプッシュします。

消費者は今呼び出されます

結果:消費者はメッセージm4、m5、m6を消費しませんが、オフセットを確認すると7に設定されています。これは、プロデューサーがメッセージの生成中にオフセットを7に進めたことを意味します。その結果、コンシューマーは現在オフセット7からメッセージを消費します

コンシューマーが再び起動したときに m4 からメッセージを読み取る必要がある場合に、理想的に助けてください。

4

1 に答える 1