DataFrame
1つと 2 つのアキュムレータを返す関数があります。spark-shell
(jarから関数を呼び出して)手動で実行すると、期待どおりに機能します。を実行すると、アキュムレータ.count
に値が入力されます。DataFrame
spark-submit
しかし、アキュムレータから関数を呼び出すと、常に空のままになります。同じ奇妙な動作で2 を返そうとDataFrame
しました: Works in spark-shell
, do not from spark-submit
.
これは、おそらく機能していない私のコードのスケルトンです。
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...
def process(
sc:SparkContext,
sqlContext:SQLContext,
filepaths : RDD[String]
): (
val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
...
...
val logRecordsPre = logRawFlow.map(
entry => {
val date = """(\d\d\d\d)-(\d\d)-(\d\d)""".r
if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
...
Row( 1L, "blah" "blah", 0L )
}
else ( fields(0) == logMetaDataPrefix ) {
...
logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
Row( 0L, "blah" "blah", 0L )
}
else {
try {
val fileName = fields(0)
logFailedRowAccumulator += (fileName)
Row( 0L, "blah" "blah", 0L )
}
catch {
case e: Exception => {
logFailedRowAccumulator += ("unknown")
Row( 0L, "blah" "blah", 0L )
}
}
}
}
)
val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)
val logDF = sqlContext.createDataFrame(logRecords, logSchema)
( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)