2

バージョン 1.2.0 のスタンドアロン モードで Spark でジョブを実行しています。

私が行っている最初の操作は、フォルダー パスの RDD を取得し、各フォルダーに存在するファイルで構成されるファイル名の RDD を生成することです。

JavaRDD<String> filePaths = paths.mapPartitions(new FoldersToFiles()).repartition(defaultPartitions);

FoldersToFiles クラスの内部実装は次のとおりです。

@Override
public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
    List<String> filesPath = new ArrayList<String>();
    if (pathsIterator != null) {
        while (pathsIterator.hasNext()) {
            try {
                String currFolder = pathsIterator.next();
                Path currPath = new Path(currFolder);
                FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                FileStatus[] files = fs.listStatus(currPath);
                List<FileStatus> filesList = Arrays.asList(files);
                List<String> filesPathsStr = new Utils().convertFileStatusToPath(filesList);
                filesPath.addAll(filesPathsStr);
            } catch(Exception e) {
                log.error("Error during file names extraction: " + e.getMessage());
            }
        }
    }
    if(filesPath == null || filesPath.isEmpty()) {
        log.error("Warning: files path list is null or empty!! Given Path Iterator is: " + pathsIterator.toString());
    }
    return filesPath;
}

クラスターでジョブを実行すると、次のエラーが発生します。

520983 [task-result-getter-1] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 33.0 in stage 1.0 (TID 1033, hadoop-w-8.c.taboola-qa-01.internal): java.lang.NullPointerException
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

したがって、エラーは私のコード内に直接ありません。ただし、Spark コードの関連する行を見ると、次のようになります。

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   */
  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
    def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
    JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
  }

(例外が発生する140行目が1行目)

おそらく、上記のコードに関連しています (そして、これは実際には私の仕事の最初の mapPartitions であるため、理にかなっています) が、その理由はわかりません。

4

1 に答える 1

0

直感: FoldersToFiles クラスを静的に宣言する必要があるのではないでしょうか (それがプライベート クラスの場合)。

于 2015-01-15T12:55:23.137 に答える