0

私はそのようなテストシナリオを得ました:

  1. いくつかのレコードを DB に挿入します
  2. それらのレコードが Kafka によって処理されるのを待ちます
  3. これらのレコードが Kafka トピックに存在するかどうかを確認します

ステップ 2 には 5 秒かかる場合があります。時には2分かかります。
最初のステップでいくつかの ID が返されます。これは、ステップ 2 (Kafka のトピック) で探したいものです。私が達成したいのは、これらの ID (最初のステップから) が特定のタイムアウト (たとえば 20 秒) で Kafka で処理されているかどうかを確認し、他の手順に進むことです。たとえば
、Kafka からすべてのメッセージを取得し、それらの ID がそこにいます。そうでない場合は、別の例で 20 秒間待ちます。それらのメッセージをフェッチし、ID が新しくフェッチされたメッセージに含まれているかどうかを確認します。すべての ID が見つかった場合は、(タイムアウトに達する前に) それらのメッセージのリストを返すか、タイムアウトに達した後に - 既にフェッチされたメッセージのリストを返します。

少しトリッキーかもしれないので、うまく説明できれば幸いです。それを実現する可能性があるかどうか疑問に思っています。

これまでのところ、私はそのようなことしか理解していませんでした。

    def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
        counter = 0
        result = []
        while counter <= timeout:
            res = self._get_all_kafka_KM_values() # at the very bottom  it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
            result.extend(res)
            if all(item in res for item in items_to_verify):
                break
            time.sleep(1)
            counter += 1
        return result

それを行うための好ましい方法はありますか?多分スレッド、デコレータで何か?リストに何かが存在するかどうかを確認する方法がわかりません-存在する場合は何かを返し、存在しない場合はもう一度試してください...タイムアウトに達するまで。

4

0 に答える 0