37

Spark Streaming を使用して 2 つの Kafka キュー間でデータを処理していますが、Spark から Kafka に書き込む良い方法が見つからないようです。私はこれを試しました:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

意図したとおりに機能しますが、すべてのメッセージに対して新しい KafkaProducer をインスタンス化することは、実際のコンテキストでは明らかに実行不可能であり、私はそれを回避しようとしています。

プロセスごとに単一のインスタンスへの参照を保持し、メッセージを送信する必要があるときにアクセスしたいと考えています。Spark Streaming から Kafka に書き込むにはどうすればよいですか?

4

8 に答える 8

34

はい、残念ながら、Spark (1.x、2.x) では、効率的な方法で Kafka に書き込む方法を簡単に説明することはできません。

次のアプローチをお勧めします。

  • KafkaProducerエグゼキューター プロセス/JVM ごとに 1 つのインスタンスを使用 (および再使用) します。

このアプローチの大まかな設定は次のとおりです。

  1. 最初に、Kafka を「ラップ」する必要がありKafkaProducerます。これは、前述のようにシリアル化できないためです。ラップすると、エグゼキューターに「出荷」できます。ここでの重要なアイデアはlazy val、最初に使用するまでプロデューサのインスタンス化を遅らせるためにa を使用することですKafkaProducer
  2. ブロードキャスト変数を使用して、ラップされたプロデューサーを各エグゼキューターに「出荷」します。
  3. 実際の処理ロジック内で、ブロードキャスト変数を介してラップされたプロデューサーにアクセスし、それを使用して処理結果を Kafka に書き戻します。

以下のコード スニペットは、Spark 2.0 の Spark ストリーミングで動作します。

ステップ1:ラッピングKafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

ステップ 2: ブロードキャスト変数を使用して、各エグゼキュータに独自のラップされたKafkaProducerインスタンスを与える

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

ステップ 3: Spark Streaming から Kafka に書き込み、同じラップされたKafkaProducerインスタンスを再利用します (エグゼキューターごとに)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

お役に立てれば。

于 2016-09-16T19:56:38.927 に答える
23

私の最初のアドバイスは、 foreachPartition で新しいインスタンスを作成し、それがニーズに十分に対応できるかどうかを測定することです ( foreachPartition で重いオブジェクトをインスタンス化することは、公式ドキュメントが示唆していることです)。

もう 1 つのオプションは、次の例に示すように、オブジェクト プールを使用することです。

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

ただし、チェックポイントを使用する場合、実装が難しいことがわかりました。

私にとってうまく機能している別のバージョンは、次のブログ投稿で説明されているファクトリです。ニーズに十分な並列処理が提供されるかどうかを確認する必要があります (コメント セクションを確認してください)。

http://allegro.tech/2015/08/spark-kafka-integration.html

于 2015-07-23T15:19:57.120 に答える
8

Cloudera によって管理されている Streaming Kafka Writer があります (実際には Spark JIRA からスピンオフされました[1] )。基本的に、パーティションごとにプロデューサを作成します。これは、(願わくば大きい) 要素のコレクションに対して「重い」オブジェクトを作成するために費やされた時間を償却します。

ライターはここにあります: https://github.com/cloudera/spark-kafka-writer

于 2015-07-23T23:31:58.967 に答える
8

私は同じ問題を抱えていて、この投稿を見つけました。

作成者は、executor ごとに 1 つのプロデューサーを作成することで問題を解決します。プロデューサー自体を送信する代わりに、エグゼキューターでプロデューサーをブロードキャストして作成する方法の「レシピ」のみを送信します。

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

彼は、遅延してプロデューサーを作成するラッパーを使用します。

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

Kafka プロデューサーはエグゼキューターで最初に使用する直前に初期化されるため、ラッパーはシリアライズ可能です。ドライバーはラッパーへの参照を保持し、ラッパーは各エグゼキューターのプロデューサーを使用してメッセージを送信します。

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
于 2016-09-07T08:50:27.760 に答える
3

なぜそれは実行不可能なのですか?基本的に、各 RDD の各パーティションは独立して実行されるため (別のクラスター ノードで実行される場合もあります)、各パーティションのタスクの開始時に接続 (および同期) をやり直す必要があります。そのオーバーヘッドが高すぎる場合は、StreamingContext許容できるようになるまでバッチ サイズを増やす必要があります (明らかに、これを行うにはレイテンシ コストがかかります)。

(各パーティションで何千ものメッセージを処理していない場合、spark-streaming がまったく必要ですか? スタンドアロン アプリケーションを使用した方がよいでしょうか?)

于 2015-07-23T15:53:38.043 に答える
2

これはあなたがやりたいことかもしれません。基本的に、レコードのパーティションごとに 1 つのプロデューサーを作成します。

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

それが役立つことを願っています

于 2015-07-24T03:59:57.910 に答える