次のdata.frameがあります:
library(sparklyr)
library(dplyr)
testDF <- data.frame(A = c(1, 2, 3, 4, 5, 6, 7, 8),
B = c(10, 20, 30, 40, 50, 60, 70, 80),
C = c(100, 200, 300, 400, 500, 600, 700, 800),
D = c(1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000))
作成したら、sparklyrを使用して Spark にコピーできます。
testDFCopied <- copy_to(sc, testDF, "testDF", overwrite = TRUE)
作成したらmutate
、関数を使用して別の列を作成して、列を作成できlag
ます。
head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A)), 10)
Source: query [?? x 5]
Database: spark connection master=yarn app=sparklyr local=FALSE
A B C D E
<dbl> <dbl> <dbl> <dbl> <dbl>
1 1 10 100 1000 NaN
2 2 20 200 2000 1
3 3 30 300 3000 2
4 4 40 400 4000 3
5 5 50 500 5000 4
6 6 60 600 6000 5
7 7 70 700 7000 6
8 8 80 800 8000 7
mutate
関数を使用して複数の列を作成しようとすると、問題が発生しますlag
。たとえば、ここでは、列 A と B の "ラグ" である 2 つの新しい列 E と F を作成します。
head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = lag(B)), 10)
Source: query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE
Error: org.apache.spark.sql.AnalysisException: Window Frame RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1785)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1781)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
ただし、この例外は、2 つの列を作成しても 1 回しか使用しない場合には発生しませんlag
。次に例を示します。
head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = C - B), 10)
Source: query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE
A B C D E F
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 10 100 1000 NaN 90
2 2 20 200 2000 1 180
3 3 30 300 3000 2 270
4 4 40 400 4000 3 360
5 5 50 500 5000 4 450
6 6 60 600 6000 5 540
7 7 70 700 7000 6 630
8 8 80 800 8000 7 720
何らかの理由で、アクションlag()
内で 2 つの呼び出しが実行されたときに例外が発生します。と のさまざまな組み合わせとのさまざまな配置をmutate
(失敗して) 試しました。それらはすべて同じ例外を発生させますが、これは理解できません。Spark コードを見ると、ここで例外が発生していることがわかります。lag()
lead()
mutate
/**
* Check and add proper window frames for all window functions.
*/
object ResolveWindowFrame extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case logical: LogicalPlan => logical transformExpressions {
case WindowExpression(wf: WindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
if wf.frame != UnspecifiedFrame && wf.frame != f =>
failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}")
...
チェックに失敗したウィンドウ関数の状態に関連していることは理解していlag
ますが、ここでの根本的な問題はよくわかりません。どんな助け/アイデアも大歓迎です。