2

Spark 1.1 の使用

2 つのデータセットがあります。1 つは非常に大きく、もう 1 つは (1:100 のフィルタリングを使用して) はるかに小さい縮尺に縮小されています。小さいリストのアイテムのみを大きいリストの対応するアイテムと結合することにより、大きいデータセットを同じスケールに縮小する必要があります (これらのリストには、相互結合フィールドを持つ要素が含まれています)。

私は次のコードを使用してそれを行っています:

  • 「if(joinKeys != null)」の部分が該当部分
  • 小さいリストは「joinKeys」、大きいリストは「keyedEvents」

    private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {
    
    outputPath = outputPath + "/" + type;
    
    JavaRDD events = jsonsList.filter(new TypeFilter(type));
    
    
    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
    if(joinKeys != null) {
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new KeyAdder("requestId"));
    
        JavaRDD < ObjectNode > joinedEvents = joinKeys.join(keyedEvents).values().map(new PairToSecond());
    
        events = joinedEvents;
    }
    
    
    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new KeyAdder("sliceKey")).groupByKey();
    // Add convert jsons to strings and add "\n" at the end of each
    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new JsonsToStrings());
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
    return events;
    }
    

このジョブを実行すると、常に同じエラーが発生します。

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2757 in stage 13.0 failed 4 times, most recent failure: Lost task 2757.3 in stage 13.0 (TID 47681, hadoop-w-175.c.taboola-qa-01.internal): java.io.FileNotFoundException: /hadoop/spark/tmp/spark-local-20141201184944-ba09/36/shuffle_6_2757_2762 (Too many open files)
    java.io.FileOutputStream.open(Native Method)
    java.io.FileOutputStream.<init>(FileOutputStream.java:221)
    org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
    org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
    scala.collection.Iterator$class.foreach(Iterator.scala:727)
    scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:745)

すべてのクラスター マシンで次の手順を実行して、ulimits を既に増やしました。

echo "* soft nofile 900000" >> /etc/security/limits.conf
echo "root soft nofile 900000" >> /etc/security/limits.conf
echo "* hard nofile 990000" >> /etc/security/limits.conf
echo "root hard nofile 990000" >> /etc/security/limits.conf
echo "session required pam_limits.so" >> /etc/pam.d/common-session
echo "session required pam_limits.so" >> /etc/pam.d/common-session-noninteractive

しかし、私の問題は解決しません...

4

1 に答える 1

3

bdutil フレームワークは、ユーザー「hadoop」がジョブを実行するように機能します。クラスターをデプロイするスクリプトは、「hadoop」ユーザーの ulimit 設定をオーバーライドするファイル /etc/security/limits.d/hadoop.conf を作成しましたが、これは私が知りませんでした。このファイルを削除するか、必要な ulimits をそこに設定することで、問題を解決できました。

于 2014-12-03T07:33:29.603 に答える