はい、残念ながら、Spark (1.x、2.x) では、効率的な方法で Kafka に書き込む方法を簡単に説明することはできません。
次のアプローチをお勧めします。
KafkaProducer
エグゼキューター プロセス/JVM ごとに 1 つのインスタンスを使用 (および再使用) します。
このアプローチの大まかな設定は次のとおりです。
- 最初に、Kafka を「ラップ」する必要があり
KafkaProducer
ます。これは、前述のようにシリアル化できないためです。ラップすると、エグゼキューターに「出荷」できます。ここでの重要なアイデアはlazy val
、最初に使用するまでプロデューサのインスタンス化を遅らせるためにa を使用することですKafkaProducer
。
- ブロードキャスト変数を使用して、ラップされたプロデューサーを各エグゼキューターに「出荷」します。
- 実際の処理ロジック内で、ブロードキャスト変数を介してラップされたプロデューサーにアクセスし、それを使用して処理結果を Kafka に書き戻します。
以下のコード スニペットは、Spark 2.0 の Spark ストリーミングで動作します。
ステップ1:ラッピングKafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
ステップ 2: ブロードキャスト変数を使用して、各エグゼキュータに独自のラップされたKafkaProducer
インスタンスを与える
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
ステップ 3: Spark Streaming から Kafka に書き込み、同じラップされたKafkaProducer
インスタンスを再利用します (エグゼキューターごとに)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
お役に立てれば。