数日前に Databricks を使い始めたばかりで、S3/bucket_name/../raw からいくつかのログ ファイルを取得して処理し、ログに作業したい特定の行が含まれているかどうかを確認し、その行を別のフォルダーに保存しようとしています。 「S3/bucket_name/../processed/」と呼ばれる
これは私がこれまでに試したことです。
RAW_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + RAW_FILE_PATH
PROCESSED_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + PROCESSED_FILE_PATH
raw_file_list = dbutils.fs.ls(RAW_LOG_PATH)
processed_file_list = dbutils.fs.ls(PROCESSED_LOG_PATH)
processed_file_names = [file.name for file in processed_file_list]
#Filter log records that contains 'country:France'and save it to my_records
for file in raw_file_list:
if file.name not in processed_file_names:
my_records = []
my_entries = sc.textFile(file.path)
lines = my_entries.collect()
for line in lines:
if 'country:France' in line:
my_records.append(line)
new_file = PROCESSED_LOG_PATH + file.name
if len(mobile_recs_logs) > 0:
dbutils.fs.put(str(new_file), str(my_records))
print('File processed:', new_file, len(my_records))
else:
print ('No records:', file.name, len(my_records))
必要な行を抽出し、S3 の新しい処理済みフォルダーに新しいファイルを出力できます。しかし、そのファイルにアクセスして結果を出力しようとすると、いくつかのエラーが発生します
# Checking the output after filter
FILE_NAME = '2016-10-27.log.gz'
check_new_file = PROCESSED_LOG_PATH + FILE_NAME
new_entries = sc.textFile(check_new_file)
logs = new_entries.take(1)
print (logs)
エラーメッセージ:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 257.0 failed 4 times, most recent failure: Lost task 0.3 in stage 257.0 (TID 275, ip-x-x-x-x.ap-southeast-1.compute.internal): java.io.IOException: incorrect header check
この問題は、出力形式が S3 に保存されていることが原因であると推測しています。ログ ファイルから必要な行を処理して抽出し、その行を別のファイルに保存して S3 に保存し、Databricks の S3 で新しく保存されたファイルを操作する方法はありますか?