私はそのようなテストシナリオを得ました:
- いくつかのレコードを DB に挿入します
- それらのレコードが Kafka によって処理されるのを待ちます
- これらのレコードが Kafka トピックに存在するかどうかを確認します
ステップ 2 には 5 秒かかる場合があります。時には2分かかります。
最初のステップでいくつかの ID が返されます。これは、ステップ 2 (Kafka のトピック) で探したいものです。私が達成したいのは、これらの ID (最初のステップから) が特定のタイムアウト (たとえば 20 秒) で Kafka で処理されているかどうかを確認し、他の手順に進むことです。たとえば
、Kafka からすべてのメッセージを取得し、それらの ID がそこにいます。そうでない場合は、別の例で 20 秒間待ちます。それらのメッセージをフェッチし、ID が新しくフェッチされたメッセージに含まれているかどうかを確認します。すべての ID が見つかった場合は、(タイムアウトに達する前に) それらのメッセージのリストを返すか、タイムアウトに達した後に - 既にフェッチされたメッセージのリストを返します。
少しトリッキーかもしれないので、うまく説明できれば幸いです。それを実現する可能性があるかどうか疑問に思っています。
これまでのところ、私はそのようなことしか理解していませんでした。
def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
counter = 0
result = []
while counter <= timeout:
res = self._get_all_kafka_KM_values() # at the very bottom it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
result.extend(res)
if all(item in res for item in items_to_verify):
break
time.sleep(1)
counter += 1
return result
それを行うための好ましい方法はありますか?多分スレッド、デコレータで何か?リストに何かが存在するかどうかを確認する方法がわかりません-存在する場合は何かを返し、存在しない場合はもう一度試してください...タイムアウトに達するまで。