2

kafka トピックを読み込んで、spark ストリーミングで kudu テーブルに書き込みたいです。

私の最初のアプローチ

// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);

// structure
val schema: StructType = StructType(
  StructField("userNo", IntegerType, true) ::
  StructField("bandNo", IntegerType, false) ::
  StructField("ipv4", StringType, false) :: Nil);

// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
  .setNumReplicas(1)
  .addHashPartitions(List("userNo").asJava, 3))

// get stream from kafka
val parsed = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("startingOffsets", "latest")
  .option("subscribe", "feed_api_band_get_popular_post_list")
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");

今それは不平を言う

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)

私の2番目のアプローチ

従来の KafkaUtils.createDirectStream を使用するようにコードを変更したようです

KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
  rdd.foreach(record => {
    // write to kudu.............
    println(record.value());
  })
});

ssc.start();
ssc.awaitTermination();

それで、どちらが正しいアプローチですか?または、最初のアプローチから実行する方法はありますか?

Spark のバージョンは 2.2.0 です。

4

4 に答える 4

1

現時点では、Spark 構造化ストリーミングで KuduContext を使用するための Kudu サポートはないと思います。同様の問題があり、従来の Kudu クライアントを使用して ForeachWriter[Row] クラスを実装する必要がありました。ここで例を使用して、解決策を達成することができました。

于 2018-09-04T19:08:31.720 に答える