0

数日前に Databricks を使い始めたばかりで、S3/bucket_name/../raw からいくつかのログ ファイルを取得して処理し、ログに作業したい特定の行が含まれているかどうかを確認し、その行を別のフォルダーに保存しようとしています。 「S3/bucket_name/../processed/」と呼ばれる

これは私がこれまでに試したことです。

RAW_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + RAW_FILE_PATH
PROCESSED_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + PROCESSED_FILE_PATH

raw_file_list = dbutils.fs.ls(RAW_LOG_PATH)
processed_file_list = dbutils.fs.ls(PROCESSED_LOG_PATH)    
processed_file_names = [file.name for file in processed_file_list]

#Filter log records that contains 'country:France'and save it to my_records 
for file in raw_file_list:
  if file.name not in processed_file_names:
    my_records = []
    my_entries = sc.textFile(file.path)
    lines = my_entries.collect()
    for line in lines:
      if 'country:France' in line: 
        my_records.append(line)
    new_file = PROCESSED_LOG_PATH + file.name
    if len(mobile_recs_logs) > 0:
      dbutils.fs.put(str(new_file), str(my_records))
      print('File processed:', new_file, len(my_records))
    else:
      print ('No records:', file.name, len(my_records))

必要な行を抽出し、S3 の新しい処理済みフォルダーに新しいファイルを出力できます。しかし、そのファイルにアクセスして結果を出力しようとすると、いくつかのエラーが発生します

# Checking the output after filter
FILE_NAME = '2016-10-27.log.gz'
check_new_file = PROCESSED_LOG_PATH + FILE_NAME

new_entries = sc.textFile(check_new_file)
logs = new_entries.take(1)
  print (logs)

エラーメッセージ:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 257.0 failed 4 times, most recent failure: Lost task 0.3 in stage 257.0 (TID 275, ip-x-x-x-x.ap-southeast-1.compute.internal): java.io.IOException: incorrect header check

この問題は、出力形式が S3 に保存されていることが原因であると推測しています。ログ ファイルから必要な行を処理して抽出し、その行を別のファイルに保存して S3 に保存し、Databricks の S3 で新しく保存されたファイルを操作する方法はありますか?

4

1 に答える 1

1

sc.textFileこの特定のケースでは、入力ファイルが gzip 圧縮されており、そのファイルの解凍が によって自動的に処理されたが、ファイルをフォルダーに保存したときに/processed新しいファイルが圧縮されていないことが問題だと思います。新しいファイルの名前は で終わるため.gz、Spark はそのファイルを解凍しようとしますが、解凍は失敗します。

この問題を回避するには、書き込む出力を圧縮するか、出力ファイルの名前を変更して.gz拡張子を省略することを検討してください。

于 2016-10-28T20:01:53.310 に答える