0

関連: java.util.concurrent.Future の scala.concurrent.Future ラッパー

これは私の他の質問から来ました:

akkaストリームkafka(reactive-kafka)をakka httpアプリケーションに統合するには?

AKKA HTTP アプリケーションがあり、次のように、ルートの onComplete 関数でメッセージ/ProducerRecord を Kafka に送信したいと考えています。

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

ただし、onComplete(producer send producerRecord)は次のタイプの不一致エラーを生成しています:

[エラー] が見つかりました: Future[org.apache.kafka.clients.producer.RecordMetadata] (java.util.concurrent 内) [エラー] 必須: Future[org.apache.kafka.clients.producer.RecordMetadata] (scala.同時) [エラー] onCompleteRecordMetadata { _ =>

これを回避する方法はありますか?おそらくプロデューサーをシンクとして使用することで ( http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink ) Javaのプロデューサー.送信機能?

4

2 に答える 2