1

私はSparkR(および一般的な並列化)に非常に慣れていません。私はSparkRをローカルで実行しています(これはsparkの正しい使用法ではないことはわかっていますが、始めたばかりです)。コードの一部をsparkRで書き直そうとしましたが、collectの数を増やすと次のエラーが発生しますサンプルとして (サンプル数が少ない場合はエラーなし):

Error in unserialize(obj) : 
ReadItem: unknown type 0, perhaps written by later version of R
Calls: assetForecast ... convertJListToRList -> lapply -> lapply -> FUN   -> unserialize
Execution halted

おそらく私のメモリ不足が原因である他のエラーは次のとおりです。

heap memory error (trying increasing JVM memory & driver memory did not help) 

FIRST エラーに関するヘルプをいただければ幸いです (並列化で numSlices に異なる値を設定することでエラーが発生したにもかかわらず、それらが何らかの形で関連している可能性があると考えたため、2 番目のエラーを投稿しました)。最初のものは、このシリアライゼーションの問題を引き起こす、spark、sparkR、および R の間のバージョンの非互換性である可能性があると思います。別のバージョンをインストールしようとしましたが、すぐに依存関係の解決に行き詰まりました。

これは、SparkR で行っていることをシミュレートするサンプル スクリプトです (input.len > 950 でエラーが生成されます)。

library(SparkR) # load sparkR library
sc <- sparkR.init() ## initialize the sparkR
input.len <- 8000 # size of the input
num.slice <- 2 # number of slices for parallelize function

## Define a few functions to simulate actual calculations
latemail <- function(N, st="2012/01/01", et="2015/12/31") {
   ## create random date of length N
   st <- as.POSIXct(as.Date(st))
   et <- as.POSIXct(as.Date(et))
   dt <- as.numeric(difftime(et,st,unit="sec"))
   ev <- sort(runif(N, 0, dt))
   rt <- st + ev
}

encode <- function(ele1, ele2) {
   ## concatenate ele1 and ele2, seperated by %
   return (paste(toString(ele1), toString(ele2), sep = "%"))
}

decode <- function(coded) {
   ## separate input string by %
   idx <- regexpr("%", coded)[1]
   ele1 <- as.numeric(substr(coded, 1, idx-1))
   ele2 <- substr(coded, idx + 1, nchar(coded))
   return (list(ele1, ele2))
 }

 fakeFun <- function(asset.age, asset.year) {
    ## fake function to simulate my actual function
    return (as.list(rep(asset.age, 10)))
 }

  wrapperFun <- function(x) {
     asset.age <- decode(x)[[1]]
     asset.y <- decode(x)[[1]]
     df <- fakeFun(asset.age, asset.y)
     return (df)
  }

  ## Start of calculations with SparkR
  calc.ts <- latemail(input.len) ## create fake years
  asset.ages <- runif(input.len) * 10 ## create fake ages

  paired <- list()
  for (i in 1:length(asset.ages)) {
     ## keep information of both years and ages in one vector
     ## using encode function
     paired[[length(paired) + 1]] <- encode(asset.ages[[i]], calc.ts[[i]])
  }

  rdd.paired <- parallelize(sc, paired, numSlices = num.slice)
  rdd.df <- lapply(rdd.paired, wrapperFun)
  rdd.list <- collect(rdd.df)
  print(rdd.list)
  sparkR.stop()

エラーの完全なレポートは次のとおりです。

for numSlice = 5 in parallelize function:
    > rdd.list <- collect(rdd.df)
    15/07/22 17:20:40 INFO RRDD: Times: boot = 0.434 s, init = 0.015 s, broadcast = 0.000 s, read-input = 0.003 s, compute = 0.200 s, write-output = 0.004 s, total = 0.656 s
    15/07/22 17:20:41 INFO RRDD: Times: boot = 0.010 s, init = 0.017 s, broadcast = 0.000 s, read-input = 0.003 s, compute = 0.193 s, write-output = 0.004 s, total = 0.227 s
    15/07/22 17:20:41 INFO RRDD: Times: boot = 0.010 s, init = 0.013 s, broadcast = 0.001 s, read-input = 0.002 s, compute = 0.191 s, write-output = 0.003 s, total = 0.220 s
    15/07/22 17:20:41 INFO RRDD: Times: boot = 0.010 s, init = 0.011 s, broadcast = 0.000 s, read-input = 0.002 s, compute = 0.191 s, write-output = 0.004 s, total = 0.218 s
    15/07/22 17:20:41 INFO RRDD: Times: boot = 0.014 s, init = 0.015 s, broadcast = 0.000 s, read-input = 0.003 s, compute = 0.213 s, write-output = 0.004 s, total = 0.249 s
    Error in unserialize(obj) : 
      ReadItem: unknown type 0, perhaps written by later version of R
    Calls: collect ... convertJListToRList -> lapply -> lapply -> FUN -> unserialize
    Execution halted

for numSlice = 6 in parallelize function
15/07/22 17:18:52 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.OutOfMemoryError: Java heap space
        edu.berkeley.cs.amplab.sparkr.RRDD.readData(RRDD.scala:258)
        edu.berkeley.cs.amplab.sparkr.RRDD.readData(RRDD.scala:243)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD.read(RRDD.scala:200)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD$$anon$1.next(RRDD.scala:70)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD$$anon$1.foreach(RRDD.scala:66)
        scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD$$anon$1.to(RRDD.scala:66)
        scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD$$anon$1.toBuffer(RRDD.scala:66)
        scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        edu.berkeley.cs.amplab.sparkr.BaseRRDD$$anon$1.toArray(RRDD.scala:66)
        org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:774)
        org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:774)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/07/22 17:18:52 ERROR TaskSetManager: Task 2 in stage 0.0 failed 1 times; aborting job
Error in readTypedObject(con, type) : 
  Unsupported type for deserialization 
Calls: collect ... callJMethod -> invokeJava -> readObject -> readTypedObject
Execution halted

SparkR のインストールに本当に問題がありますか? はいの場合、少数のサンプルに対してどのように実行されますか?

どうもありがとう

4

1 に答える 1

0

次の答えは、それがどのように機能するかです (または、Spark-1.4.0 で機能するはずです)。最初に sqlContext も初期化します。

sqlContext <- sparkRSQL.init(sc)

そして、コードを次から変更するよりも

paired <- list()

# Create a vector instead of a list
paired <- c()
for (i in 1:length(asset.ages)) {
  ## keep information of both years and ages in one vector
  ## using encode function
  paired[length(paired) + 1] <- encode(asset.ages[[i]], calc.ts[[i]])
}

# What you actually need is a data.frame or SparkR DataFrame
paired.data.frame <- data.frame(paired=paired)
paired.DataFrame  <- createDataFrame(sqlContext, paired.data.frame)
# Map function returns an RDD which you can not collect yet
# Therefor convert it to a DataFrame again
paired.df         <- createDataFrame(sqlContext, map(paired.DataFrame,wrapperFun))
# This DataFrame you can collect
paired.result     <- collect(paired.df)

なぜ最初の文でうまくいくべきだと言ったのですか? ラップトップで実行すると機能しますが、SparkR ソース コードを変更してマップを使用できるようにしました。ただし、SparkR 1.2 でこれを修正する方法はわかりませんが、SparkR はそれ以降 Spark に統合されているため、とにかく Spark-1.4.0 に変更することをお勧めします。

于 2015-07-23T09:53:31.447 に答える