DataFrame
一連のフィルター クエリを適用する必要があります。たとえば、次のようにロードDataFrame
します。
val df = spark.read.parquet("hdfs://box/some-parquet")
次に、次のような「任意の」フィルターがたくさんあります。
- C0='true' および C1='false'
- C0='false' および C3='true'
- 等々...
私は通常、これらのフィルターを util メソッドを使用して動的に取得します。
val filters: List[String] = getFilters()
これらのフィルタを に適用しDataFrame
てカウントを取得するだけです。例えば。
val counts = filters.map(filter => {
df.where(filter).count
})
フィルターをマッピングするとき、これは並列/分散操作ではないことに気付きました。フィルターを RDD/DataFrame に貼り付けると、ネストされたデータ フレーム操作を実行することになるため、このアプローチも機能しません (SO で読んだように、Spark では許可されていません)。次のようなものは、NullPointerException (NPE) を返します。
val df = spark.read.parquet("hdfs://box/some-parquet")
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'"))
val counts = filterRDD.map(df.filter(_).count).collect
原因: java.lang.NullPointerException org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) で at $anonfun$1.apply(:27) at $anonfun$1.apply(:27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) scala.collection.Iterator$class.foreach(Iterator.scala:893) で scala.collection.AbstractIterator.foreach(Iterator.scala:1336)で scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) で scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) で scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) で scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) で scala.collection.AbstractIterator.to(Iterator.scala:1336) で scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) で scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) で scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) で scala.collection.AbstractIterator.toArray(Iterator.scala:1336) で org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) で org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) で org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) で org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) で org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala:70) で org.apache.spark.scheduler.Task.run(Task.scala:86) で org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) で java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) で java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) で java.lang.Thread.run(Thread.java:745) で
DataFrame
Spark でカウント フィルターを並列化/分散する方法はありますか? ところで、私は Spark v2.0.2 を使用しています。