0

に基づいて入力ファイルを分割しようとしていますがaccountId、この分割は、dataFrames に 1000 を超えるレコードが含まれている場合にのみ行われます。はaccountId不明ではない動的な整数です。以下のコードを検討してください

    val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()

lines.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    val df = sqlContext.read.json(rdd)
    val filteredDF = df.filter(df("accountId")==="3")
    if (filteredDF.count() > 1000) {
      df.write.partitionBy("accountId").format("json").save("output")
    }
  }
}

ssc.start()
ssc.awaitTermination()

しかし、上記のコードは必要のないすべての accountId を分割します。

  1. データフレーム内のそれぞれの数を見つけたいですaccountId
  2. 各 accountId のレコードが 1000 を超える場合は、分割された情報を出力ソースに書き込みます。

たとえば、入力ファイルに accountId=1 の 1500 レコードと accountId=2 の 10 レコードがある場合、accountId=1 に基づいてフィルター処理されたデータフレームを出力ソースに分割し、accountId=2 レコードをメモリに保持します。

スパークストリーミングを使用してこれを達成する方法は?

4

1 に答える 1

1

やるべきだった

filteredDF.write.partitionBy("accountId").format("json").save("output")

それ以外の

df.write.partitionBy("accountId").format("json").save("output")
于 2016-07-29T18:15:32.660 に答える