0

私は以下のケースクラスを持っています:

case class Alpakka(id:Int,name:String,animal_type:String)

次のコードを使用して、これらのケース クラスのリストを kafka のプロデューサーに接続しようとしています。

  def connectEntriesToProducer(seq: Seq[Alpakka]) = {


    val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

seq.map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))   
      .runWith(Producer.plainSink(producerSettings))
  }

ケースクラスをjsonに変換するためにcirceを使用しています。ただし、次のようなコンパイラ エラーが発生し続けます。

Error:(87, 34) type mismatch;
 found   : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
 required: org.apache.kafka.clients.producer.ProducerRecord[String,String] => ?
      .runWith(Producer.plainSink(producerSettings))

何が起こっているのかわからない!

4

1 に答える 1

0

ではなくGraphからを構築しようとしています。SeqSource

メソッドconnectEntriesToProducerは次のようになります

def connectEntriesToProducer(seq: Source[Alpakka]) = {

Sourceの代わりに注意してくださいSeq

別の方法として、 からソースを作成することもできますが、 は不変の iterable しか取得できないため、使用Seqする必要があります。immutable.SeqSource.apply

def connectEntriesToProducer(seq: scala.collection.immutable.Seq[Alpakka]) = {
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")

Source(seq).
  map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))
  .runWith(Producer.plainSink(producerSettings))
}
于 2019-04-08T22:11:22.973 に答える