1

次のdata.frameがあります:

library(sparklyr)
library(dplyr)
testDF <- data.frame(A = c(1, 2, 3, 4, 5, 6, 7, 8), 
B = c(10, 20, 30, 40, 50, 60, 70, 80), 
C = c(100, 200, 300, 400, 500, 600, 700, 800), 
D = c(1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000))

作成したら、sparklyrを使用して Spark にコピーできます。

testDFCopied <- copy_to(sc, testDF, "testDF", overwrite = TRUE)

作成したらmutate、関数を使用して別の列を作成して、列を作成できlagます。

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A)), 10)
Source:   query [?? x 5]
Database: spark connection master=yarn app=sparklyr local=FALSE

      A     B     C     D     E
  <dbl> <dbl> <dbl> <dbl> <dbl>
1     1    10   100  1000   NaN
2     2    20   200  2000     1
3     3    30   300  3000     2
4     4    40   400  4000     3
5     5    50   500  5000     4
6     6    60   600  6000     5
7     7    70   700  7000     6
8     8    80   800  8000     7

mutate関数を使用して複数の列を作成しようとすると、問題が発生しますlag。たとえば、ここでは、列 A と B の "ラグ" である 2 つの新しい列 E と F を作成します。

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = lag(B)), 10)
Source:   query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE

Error: org.apache.spark.sql.AnalysisException: Window Frame RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1785)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$29$$anonfun$applyOrElse$10.applyOrElse(Analyzer.scala:1781)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)

ただし、この例外は、2 つの列を作成しても 1 回しか使用しない場合には発生しませんlag。次に例を示します。

head(testDFCopied %>% dplyr::arrange(A) %>% dplyr::mutate(E = lag(A), F = C - B), 10)
Source:   query [?? x 6]
Database: spark connection master=yarn app=sparklyr local=FALSE

      A     B     C     D     E     F
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1     1    10   100  1000   NaN    90
2     2    20   200  2000     1   180
3     3    30   300  3000     2   270
4     4    40   400  4000     3   360
5     5    50   500  5000     4   450
6     6    60   600  6000     5   540
7     7    70   700  7000     6   630
8     8    80   800  8000     7   720

何らかの理由で、アクションlag()内で 2 つの呼び出しが実行されたときに例外が発生します。と のさまざまな組み合わせとのさまざまな配置をmutate(失敗して) 試しました。それらはすべて同じ例外を発生させますが、これは理解できません。Spark コードを見ると、ここで例外が発生していることがわかります。lag()lead()mutate

  /**
   * Check and add proper window frames for all window functions.
   */
  object ResolveWindowFrame extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case logical: LogicalPlan => logical transformExpressions {
        case WindowExpression(wf: WindowFunction,
        WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
          if wf.frame != UnspecifiedFrame && wf.frame != f =>
          failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}")
...

チェックに失敗したウィンドウ関数の状態に関連していることは理解していlagますが、ここでの根本的な問題はよくわかりません。どんな助け/アイデアも大歓迎です。

4

0 に答える 0