問題タブ [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
35 参照

python - 指定されたタイムアウトのためにカフカでメッセージを探す方法

私はそのようなテストシナリオを得ました:

  1. いくつかのレコードを DB に挿入します
  2. それらのレコードが Kafka によって処理されるのを待ちます
  3. これらのレコードが Kafka トピックに存在するかどうかを確認します

ステップ 2 には 5 秒かかる場合があります。時には2分かかります。
最初のステップでいくつかの ID が返されます。これは、ステップ 2 (Kafka のトピック) で探したいものです。私が達成したいのは、これらの ID (最初のステップから) が特定のタイムアウト (たとえば 20 秒) で Kafka で処理されているかどうかを確認し、他の手順に進むことです。たとえば
、Kafka からすべてのメッセージを取得し、それらの ID がそこにいます。そうでない場合は、別の例で 20 秒間待ちます。それらのメッセージをフェッチし、ID が新しくフェッチされたメッセージに含まれているかどうかを確認します。すべての ID が見つかった場合は、(タイムアウトに達する前に) それらのメッセージのリストを返すか、タイムアウトに達した後に - 既にフェッチされたメッセージのリストを返します。

少しトリッキーかもしれないので、うまく説明できれば幸いです。それを実現する可能性があるかどうか疑問に思っています。

これまでのところ、私はそのようなことしか理解していませんでした。

それを行うための好ましい方法はありますか?多分スレッド、デコレータで何か?リストに何かが存在するかどうかを確認する方法がわかりません-存在する場合は何かを返し、存在しない場合はもう一度試してください...タイムアウトに達するまで。