3

Amazon S3 に保存するように構成されたシンク タップでカスケードを実行しており、FileAlreadyExistsException に直面していました ([1] を参照)。これは時々発生するだけで (約 100 回に 1 回)、再現性がありませんでした。

カスケード コードを掘り下げたところ、Hfs.deleteResource() が (特に) BaseFlow.deleteSinksIfNotUpdate() によって呼び出されていることがわかりました。ところで、私たちはサイレントNPEに非常に興味をそそられました(「fsがルートディレクトリに到達したときにスローされるnpeを回避するためのハック」というコメント付き)。

そこから、独自の Tap を使用して Hfs タップを拡張し、getFileSystem(conf).delete を直接呼び出す再試行メカニズムを使用して、deleteResource() メソッド ([2] を参照) にさらにアクションを追加しました。

再試行メカニズムは改善をもたらすように見えましたが、まだ時々失敗に直面しています ([3] の例を参照): HDFS が isDeleted=true を返すように聞こえますが、フォルダーが存在するかどうかを直接尋ねると、exists=true を受け取ります。起こらない。ログには、フローが成功したときにランダムに isDeleted true または false が表示されます。これは、返された値が無関係であるか、信頼できないように聞こえます。

「フォルダーを削除する必要がありますが、そうではありません」というような動作で、自分の S3 エクスペリエンスを実現できる人はいますか? S3 の問題が疑われますが、Cascading または HDFS にも問題がある可能性はありますか?

Hadoop Cloudera-cdh3u5 と Cascading 2.0.1-wip-dev で実行しています。

[1]

org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3n://... already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
    at com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper.checkOutputSpecs(DeprecatedOutputFormatWrapper.java:75)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:923)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:882)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:882)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:856)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:104)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:174)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:137)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:122)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.j

[2]

  @Override
  public boolean deleteResource(JobConf conf) throws IOException {
    LOGGER.info("Deleting resource {}", getIdentifier());

    boolean isDeleted = super.deleteResource(conf);
    LOGGER.info("Hfs Sink Tap isDeleted is {} for {}", isDeleted,
        getIdentifier());

    Path path = new Path(getIdentifier());

    int retryCount = 0;
    int cumulativeSleepTime = 0;
    int sleepTime = 1000;

    while (getFileSystem(conf).exists(path)) {
      LOGGER
          .info(
              "Resource {} still exists, it should not... - I will continue to wait patiently...",
              getIdentifier());
      try {
        LOGGER.info("Now I will sleep " + sleepTime / 1000
            + " seconds while trying to delete {} - attempt: {}",
            getIdentifier(), retryCount + 1);
        Thread.sleep(sleepTime);
        cumulativeSleepTime += sleepTime;
        sleepTime *= 2;
      } catch (InterruptedException e) {
        e.printStackTrace();
        LOGGER
            .error(
                "Interrupted while sleeping trying to delete {} with message {}...",
                getIdentifier(), e.getMessage());
        throw new RuntimeException(e);
      }

      if (retryCount == 0) {
        getFileSystem(conf).delete(getPath(), true);
      }

      retryCount++;

      if (cumulativeSleepTime > MAXIMUM_TIME_TO_WAIT_TO_DELETE_MS) {
        break;
      }
    }

    if (getFileSystem(conf).exists(path)) {
      LOGGER
          .error(
              "We didn't succeed to delete the resource {}. Throwing now a runtime exception.",
              getIdentifier());
      throw new RuntimeException(
          "Although we waited to delete the resource for "
              + getIdentifier()
              + ' '
              + retryCount
              + " iterations, it still exists - This must be an issue in the underlying storage system.");
    }

    return isDeleted;

  }

[3]

INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] at least one sink is marked for delete
 INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] sink oldest modified date: Wed Dec 31 23:59:59 UTC 1969
 INFO [pool-2-thread-15] (HiveSinkTap.java:148) - Now I will sleep 1 seconds while trying to delete s3n://... - attempt: 1
 INFO [pool-2-thread-15] (HiveSinkTap.java:130) - Deleting resource s3n://...
 INFO [pool-2-thread-15] (HiveSinkTap.java:133) - Hfs Sink Tap isDeleted is true for s3n://...
 ERROR [pool-2-thread-15] (HiveSinkTap.java:175) - We didn't succeed to delete the resource s3n://... Throwing now a runtime exception.
 WARN [pool-2-thread-15] (Cascade.java:706) - [...] flow failed: ...
 java.lang.RuntimeException: Although we waited to delete the resource for s3n://... 0 iterations, it still exists - This must be an issue in the underlying storage system.
    at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:179)
    at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:40)
    at cascading.flow.BaseFlow.deleteSinksIfNotUpdate(BaseFlow.java:971)
    at cascading.flow.BaseFlow.prepare(BaseFlow.java:733)
    at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:761)
    at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:710)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:619)
4

2 に答える 2

2

まず、サポートされているディストリビューションについてカスケーディング互換性ページを再確認してください。

http://www.cascading.org/support/compatibility/

Amazon EMR は定期的に互換性テストを実行し、結果を報告しているため、一覧に表示されます。

次に、S3 は最終的に一貫性のあるファイルシステムです。HDFS ではありません。そのため、HDFS の動作に関する仮定は、S3 に対するデータの保存には引き継がれません。たとえば、名前の変更は実際にはコピーと削除です。コピーに何時間もかかる場所。Amazon は、内部ディストリビューションにパッチを適用して、多くの違いに対応しています。

第三に、S3 にはディレクトリがありません。これはハックであり、異なる S3 インターフェース (jets3t 対 s3cmd 対 ...) によって異なる方法でサポートされています。これは、前の点を考慮すると問題になるに違いありません。

第 4 に、特に S3 と通信する場合は、ネットワークのレイテンシと信頼性が重要です。歴史的に、Amazon ネットワークは、EMR と標準の EC2 インスタンスを使用する場合、S3 で大量のデータセットを操作する場合の動作が優れていることがわかりました。また、EMR のパッチで、ここでも問題が改善されると思います。

したがって、EMR Apache Hadoop ディストリビューションを実行して、問題が解決するかどうかを確認することをお勧めします。

于 2012-11-17T16:21:07.233 に答える
1

S3 のファイルを使用する Hadoop でジョブを実行する場合、結果整合性のニュアンスを念頭に置く必要があります。

カスケード、Hadoop ストリーミング、または直接 Java で記述されているかどうかにかかわらず、削除の競合状態が根本的な問題であることが判明した多くのアプリのトラブルシューティングを支援してきました。

特定のキーと値のペアが完全に削除された後、S3 から通知を受け取ることについて、ある時点で議論がありました。私はその機能がどこに立っているかを把握していません。それ以外の場合は、カスケードまたは S3 を使用する他のアプリのいずれであっても、バッチ ワークフローによって消費または生成されるデータが HDFS または HBase またはキー/値フレームワーク (たとえば、 、これには Redis を使用しています)。次に、S3 は耐久性のあるストレージに使用されますが、中間データには使用されません。

于 2012-11-18T16:56:33.050 に答える