1

aws s3 とローカル hdfs の間でファイルをコピーする必要があります。distcp Java API を使用しようとしましたが、問題は distcp の最後にあり、System.exit() と呼ばれ、アプリも停止しました。コピーするフォルダー/ファイルと私は複数のスレッドを使用しました。各スレッドはdistcpコマンドを実行し、distcpを終了した最初のスレッドがアプリを停止し、残りのdistcpを停止します。これを回避する他の方法はありますか、私はできることを知っていますコピーを行うために独自の MR ジョブを作成するが、他のオプションがあるかどうかを知りたい

私のコード:

List<Future<Void>> calls = new ArrayList<Future<Void>>();       
for (String dir : s3Dirs) {
    final String[] args = new String[4];
    args[0] = "-log";   
    args[1] = LOG_DIR;
    args[2] = S3_DIR;
    args[3] = LOCAL_HDFS_DIR

    calls.add(_exec.submit(new Callable<Void>() {
       @Override
       public Void call() throws Exception {                
         try {
        DistCp.main(args);      <-- Distcp command          
         } catch (Exception e) {
        System.out.println("Failed to copy files from " + args[2] + " to " + args[3]);
         }
         return null;
    }
    }));            
}

for (Future<Void> f : calls) {
    try {
        f.get();
    } catch (Exception e) {
        LOGGER.error("Error while distcp", e);
    }   
}

Distcp main()

public static void main(String argv[]) {

        int exitCode;
        try {
          DistCp distCp = new DistCp();
          Cleanup CLEANUP = new Cleanup(distCp);

          ShutdownHookManager.get().addShutdownHook(CLEANUP,
            SHUTDOWN_HOOK_PRIORITY);
          exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
        }
        catch (Exception e) {
          LOG.error("Couldn't complete DistCp operation: ", e);
          exitCode = DistCpConstants.UNKNOWN_ERROR;
        }
        System.exit(exitCode);        <--- exit here
      }
4

1 に答える 1

1

以前に distcp を使用したことがありますが、複数のスレッドであっても System.exit() 問題に直面したことはありません。そのような Distcp を使用する代わりに、ToolRunner を使用して distcp 呼び出しを呼び出してみてください ( hadoop ツール パッケージの Distcp テスト ケースで使用されているように)。Distcp テスト ケースでは、ToolRunner を使用して distcp を実行し、複数のスレッドで実行できます。上記のリンクからコード スニペットをコピーします。

public void testCopyFromLocalToLocal() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
  MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
  ToolRunner.run(new DistCp(new Configuration()),
                         new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                                       "file:///"+TEST_ROOT_DIR+"/destdat"});
  assertTrue("Source and destination directories do not match.",
             checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
  deldir(localfs, TEST_ROOT_DIR+"/destdat");
  deldir(localfs, TEST_ROOT_DIR+"/srcdat");
}
于 2014-08-13T17:08:00.800 に答える