Structured Streaming with Sparkに出会いました。これには、S3 バケットから継続的に消費し、処理された結果を MySQL DB に書き込む例があります。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
これをSpark Kafka Streamingでどのように使用できますか?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
を使用せずにこれら 2 つの例を組み合わせる方法はありstream.foreachRDD(rdd => {})
ますか?