11

Structured Streaming with Sparkに出会いました。これには、S3 バケットから継続的に消費し、処理された結果を MySQL DB に書き込む例があります。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

これをSpark Kafka Streamingでどのように使用できますか?

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

を使用せずにこれら 2 つの例を組み合わせる方法はありstream.foreachRDD(rdd => {})ますか?

4

2 に答える 2

12

を使用せずにこれら 2 つの例を組み合わせる方法はあり stream.foreachRDD(rdd => {})ますか?

まだ。Spark 2.0.0 には、構造化ストリーミングの Kafka シンク サポートがありません。これは、 Spark ストリーミングの作成者の 1 人である Tathagata Das 氏によると、Spark 2.1.0 で提供されるはずの機能です。関連する JIRA の問題は次のとおりです。

編集: (29/11/2018)

はい、Spark バージョン 2.2 以降で可能です。

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

詳細については、このSO 投稿 (Spark ストリーミングを使用した Kafka トピックの読み書き) を確認してください。

編集: (2016 年 6 月 12 日)

構造化ストリーミングの Kafka 0.10 統合は、Spark 2.0.2 で実験的にサポートされるようになりました。

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
于 2016-09-01T17:48:53.187 に答える
3

Kafkaソースからの読み取りとCassandraシンクへの書き込みに関して、同様の問題がありました。ここで簡単なプロジェクトを作成しましたkafka2spark2cassandra、誰にとっても役立つ場合に備えて共有します。

于 2017-01-05T20:25:52.270 に答える