に基づいて入力ファイルを分割しようとしていますがaccountId
、この分割は、dataFrames に 1000 を超えるレコードが含まれている場合にのみ行われます。はaccountId
不明ではない動的な整数です。以下のコードを検討してください
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
しかし、上記のコードは必要のないすべての accountId を分割します。
- データフレーム内のそれぞれの数を見つけたいです
accountId
。 - 各 accountId のレコードが 1000 を超える場合は、分割された情報を出力ソースに書き込みます。
たとえば、入力ファイルに accountId=1 の 1500 レコードと accountId=2 の 10 レコードがある場合、accountId=1 に基づいてフィルター処理されたデータフレームを出力ソースに分割し、accountId=2 レコードをメモリに保持します。
スパークストリーミングを使用してこれを達成する方法は?