-2

DataFrame1つと 2 つのアキュムレータを返す関数があります。spark-shell(jarから関数を呼び出して)手動で実行すると、期待どおりに機能します。を実行すると、アキュムレータ.countに値が入力されます。DataFrame

spark-submitしかし、アキュムレータから関数を呼び出すと、常に空のままになります。同じ奇妙な動作で2 を返そうとDataFrameしました: Works in spark-shell, do not from spark-submit.

これは、おそらく機能していない私のコードのスケルトンです。

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...

def process(
    sc:SparkContext,
    sqlContext:SQLContext,
    filepaths : RDD[String]
    ): ( 

    val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
    val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
    ...
    ...
    val logRecordsPre = logRawFlow.map(
        entry => {
            val date = """(\d\d\d\d)-(\d\d)-(\d\d)""".r
            if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
                ...
                Row( 1L, "blah" "blah", 0L )
            }
            else ( fields(0) == logMetaDataPrefix ) {
                ...
                logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
                Row( 0L, "blah" "blah", 0L )
            }
            else {
                try { 
                    val fileName = fields(0)
                    logFailedRowAccumulator += (fileName)
                    Row( 0L, "blah" "blah", 0L )
                }
                catch {
                    case e: Exception => {
                        logFailedRowAccumulator += ("unknown")
                        Row( 0L, "blah" "blah", 0L )
                    }
                }
            }
        }
    )

    val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)

    val logDF = sqlContext.createDataFrame(logRecords, logSchema)

    ( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)
4

1 に答える 1

0

私の悪い点、詳しく調べてみると、シェルで関数を手動で呼び出しているとき.countに、アキュムレータをデータフレームに変換する前に行っていたことがわかりましたが、データフレームを保存しspark-submit、アキュムレータから作成されたデータフレームからランナーによって呼び出される関数では返されたデータフレームがアクションを実行する前に作成されます..

例えば:

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
df.count
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray

ない (私がランナーでやっていたこと:)

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray
df.count
于 2015-09-14T02:05:43.993 に答える