関連: java.util.concurrent.Future の scala.concurrent.Future ラッパー
これは私の他の質問から来ました:
akkaストリームkafka(reactive-kafka)をakka httpアプリケーションに統合するには?
AKKA HTTP アプリケーションがあり、次のように、ルートの onComplete 関数でメッセージ/ProducerRecord を Kafka に送信したいと考えています。
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
ただし、onComplete(producer send producerRecord)は次のタイプの不一致エラーを生成しています:
[エラー] が見つかりました: Future[org.apache.kafka.clients.producer.RecordMetadata] (java.util.concurrent 内) [エラー] 必須: Future[org.apache.kafka.clients.producer.RecordMetadata] (scala.同時) [エラー] onCompleteRecordMetadata { _ =>
これを回避する方法はありますか?おそらくプロデューサーをシンクとして使用することで ( http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink ) Javaのプロデューサー.送信機能?