3

DataFrame一連のフィルター クエリを適用する必要があります。たとえば、次のようにロードDataFrameします。

val df = spark.read.parquet("hdfs://box/some-parquet")

次に、次のような「任意の」フィルターがたくさんあります。

  • C0='true' および C1='false'
  • C0='false' および C3='true'
  • 等々...

私は通常、これらのフィルターを util メソッドを使用して動的に取得します。

val filters: List[String] = getFilters()

これらのフィルタを に適用しDataFrameてカウントを取得するだけです。例えば。

val counts = filters.map(filter => {
 df.where(filter).count
})

フィルターをマッピングするとき、これは並列/分散操作ではないことに気付きました。フィルターを RDD/DataFrame に貼り付けると、ネストされたデータ フレーム操作を実行することになるため、このアプローチも機能しません (SO で読んだように、Spark では許可されていません)。次のようなものは、NullPointerException (NPE) を返します。

val df = spark.read.parquet("hdfs://box/some-parquet")
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'"))
val counts = filterRDD.map(df.filter(_).count).collect
原因: java.lang.NullPointerException
  org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) で
  at $anonfun$1.apply(:27)
  at $anonfun$1.apply(:27)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  scala.collection.Iterator$class.foreach(Ite​​rator.scala:893) で
  scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1336)で
  scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) で
  scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) で
  scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) で
  scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) で
  scala.collection.AbstractIterator.to(Iterator.scala:1336) で
  scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) で
  scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) で
  scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) で
  scala.collection.AbstractIterator.toArray(Iterator.scala:1336) で
  org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) で
  org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) で
  org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) で
  org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) で
  org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala:70) で
  org.apache.spark.scheduler.Task.run(Task.scala:86) で
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:27​​4) で
  java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) で
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) で
  java.lang.Thread.run(Thread.java:745) で

DataFrameSpark でカウント フィルターを並列化/分散する方法はありますか? ところで、私は Spark v2.0.2 を使用しています。

4

1 に答える 1