ここに示すように、組み込みの「withRunningKafka」を介して kafka-server からのすべてのメッセージを消費するコードをテストする必要があります: https://github.com/manub/scalatest-embedded-kafka
- 作成された埋め込みプロデューサーを介してトピックにメッセージを送信しようとしました。
- そして、プロジェクトのコードを介して、生成されたメッセージ (組み込みプロデューサーによって作成された) を消費しようとしました。
「カスタム プロデューサーとコンシューマーを使用したテスト」は {
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
ここに問題があります。「OK」を指定しても結果が「完了」になりません。一般的に、消費者をテストする私のロジックは正しいですか?