1

ここに示すように、組み込みの「withRunningKafka」を介して kafka-server からのすべてのメッセージを消費するコードをテストする必要があります: https://github.com/manub/scalatest-embedded-kafka

  1. 作成された埋め込みプロデューサーを介してトピックにメッセージを送信しようとしました。
  2. そして、プロジェクトのコードを介して、生成されたメッセージ (組み込みプロデューサーによって作成された) を消費しようとしました。

「カスタム プロデューサーとコンシューマーを使用したテスト」は {

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

ここに問題があります。「OK」を指定しても結果が「完了」になりません。一般的に、消費者をテストする私のロジックは正しいですか?

4

2 に答える 2

1

スタックオーバーフローへようこそ!

okソースが可能なさらなるメッセージを待っているため、理由が結果で完了することはありません。マップの前に追加.take(2)すると、2 つの要素の後でソースが停止し、okfuture が完成します。

于 2019-01-31T06:47:41.293 に答える
0

同時に2つの問題に直面していると思います:

  1. Kafka コンシューマーは要素を無限に待機するため (@dvim が述べているように)、実際に終了するには .take() が必要です。

  2. デフォルトでは、kafka コンシューマー グループは現在のトピックの開始点ではなく終了点から開始されるため、スピンされる前に投稿されたメッセージは消費されません。トピックの最後ではなく、最初から開始するように設定する必要があります。

于 2019-01-31T08:37:15.393 に答える