EMR を使用して、テキスト ファイルの内容を 2 つの異なるファイルに分割したいと考えています。入力ファイル、マッパーおよびリデューサー スクリプトはすべて AWS の S3 に保存されます。現在、私のマッパーは、ファイル全体の各フィールドをタブで区切ることにより、stdin の入力を再フォーマットしています。
import sys
import time
first_line = True
for line in sys.stdin:
if first_line == True:
first_line = False
continue
line= line.strip()
data=line.split('|')
d = data[0]
for i in range(1,len(data)):
d = d + '\t' +str(data[i])
d = d+ '\n'
print d
私の減速機は魔法が起こる場所です。レデューサーで、特定のフィールドの値に基づいて、このテキスト ファイルを 2 つの異なるファイルに分割したいと考えています。これが私の現在の reducer.py コードです
mobile_inquiries = open("reducer_output/mob_inq.txt", "a")
transactions = open("reducer_output/transactions.txt", "a")
mob_merchant_id='"99031479997"'
mob_response_code = '"0"'
mob_request_codes = ['"400"','"401"','"402"','"403"','"450"','"408"','"2400"','"2401"','"2402"','"2408"','"6400"','"6405"','"6450"']
for line in sys.stdin:
line= line.strip()
data=line.split('\t')
d = data[0]
merchant_id = data[4]
request_code = data[10]
response_code = data[19]
# Writes to mobile inquiry file
if (merchant_id == mob_merchant_id) and (response_code == mob_response_code) and (request_code in mob_request_codes):
d = d + '\t' +str(data[9])+ '\t' + str(data[28])+'\n'
mobile_inquiries.write(d)
# Writes to transaction file
else:
d = d + '\t' +str(data[9])+ '\t' + str(data[6])+ '\t' + str(data[4])+ '\t' + str(data[26])+ '\t' + str(data[10])+ '\t' + str(data[19])+ '\t' + str(data[28])+ '\n'
transactions.write(d)
mobile_inquiries.close()
transactions.close()
この EMR ジョブは失敗し、次のエラー メッセージが返されます: ステップが失敗したためシャットダウンします。各行で fileReaders を使用して、これらのスクリプトの両方をローカルでテストしましたが、動作します。タスクを EMR にインポートすると問題が発生します。私の質問は次のとおりです: - EMR を使用してファイルを 2 つ以上のファイルに分割することは可能ですか? - もしそうなら、S3 が動的に新しいファイルを作成することを妨げているので、EMR ジョブは失敗していますか? - または、コードの動作が間違っていますか?
すべてのフィードバックに感謝します。
ありがとうございました。