7

pyspark 1.4.1 でSpark 1.4ウィンドウ関数を使用しようとしています

しかし、ほとんどの場合、エラーまたは予期しない結果が得られます。これはうまくいくはずだと思う非常に簡単な例です:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])

wSpec = Window.orderBy(df.a).rowsBetween(-1,1)

df.select(df.a, func.rank().over(wSpec).alias("rank"))  
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))  
    ===>  org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


wSpec = Window.orderBy(df.a)

df.select(df.a, func.rank().over(wSpec).alias("rank"))
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]

ご覧のとおり、rowsBetweenフレーム指定を追加すると、ウィンドウ関数rank()lag/lead()それを認識しません:「ウィンドウ関数はフレーム指定を取りません」。

rowsBetweenフレームの仕様を少なくとも省略した場合、lag/lead()例外はスローされませんが、(私にとっては) 予期しない結果が返されます: always None。そして、rank()まだ別の例外で機能しません。

ウィンドウ関数を正しく取得するのを手伝ってくれる人はいますか?

アップデート

よし、これは pyspark のバグのように見え始めます。純粋な Spark (Scala、spark-shell) で同じテストを用意しました。

import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])

rowsBetweenScala では を適用できませんが、 と を省略した場合は両方とも期待どおりに動作しrank()ます。lag()/lead()rowsBetween

4

1 に答える 1

4

私が知る限り、2つの異なる問題があります。ウィンドウ フレームの定義は単に HiveGenericUDAFRankでサポートされていないため、表示GenericUDAFLagGenericUDAFLeadれるエラーは予期された動作です。

以下の PySpark コードの問題について

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))

私の質問https://stackoverflow.com/q/31948194/1560062に関連しているようで、 SPARK-9978で対処する必要があります。これまでのところ、ウィンドウ定義を次のように変更することで機能させることができます。

wSpec = Window.partitionBy().orderBy(df.a)
于 2015-09-03T15:11:40.527 に答える