0

EMR を使用して、テキスト ファイルの内容を 2 つの異なるファイルに分割したいと考えています。入力ファイル、マッパーおよびリデューサー スクリプトはすべて AWS の S3 に保存されます。現在、私のマッパーは、ファイル全体の各フィールドをタブで区切ることにより、stdin の入力を再フォーマットしています。

import sys
import time

first_line = True

for line in sys.stdin:
    if first_line == True:
            first_line = False
            continue
    line= line.strip()
    data=line.split('|')
    d = data[0]
    for i in range(1,len(data)):
            d = d + '\t' +str(data[i])
    d = d+ '\n'
    print d

私の減速機は魔法が起こる場所です。レデューサーで、特定のフィールドの値に基づいて、このテキスト ファイルを 2 つの異なるファイルに分割したいと考えています。これが私の現在の reducer.py コードです

mobile_inquiries = open("reducer_output/mob_inq.txt", "a")
transactions = open("reducer_output/transactions.txt", "a")
mob_merchant_id='"99031479997"'
mob_response_code = '"0"'
mob_request_codes = ['"400"','"401"','"402"','"403"','"450"','"408"','"2400"','"2401"','"2402"','"2408"','"6400"','"6405"','"6450"']

for line in sys.stdin:          
    line= line.strip()
    data=line.split('\t')
    d = data[0]
    merchant_id = data[4]
    request_code = data[10]
    response_code = data[19]

# Writes to mobile inquiry file
    if (merchant_id == mob_merchant_id) and (response_code == mob_response_code) and (request_code in mob_request_codes):
        d = d + '\t' +str(data[9])+ '\t' + str(data[28])+'\n'               
        mobile_inquiries.write(d)
# Writes to transaction file
    else:
        d = d + '\t' +str(data[9])+ '\t' + str(data[6])+ '\t' + str(data[4])+ '\t' + str(data[26])+ '\t' + str(data[10])+ '\t' + str(data[19])+ '\t' + str(data[28])+ '\n'
        transactions.write(d)
mobile_inquiries.close()
transactions.close()

この EMR ジョブは失敗し、次のエラー メッセージが返されます: ステップが失敗したためシャットダウンします。各行で fileReaders を使用して、これらのスクリプトの両方をローカルでテストしましたが、動作します。タスクを EMR にインポートすると問題が発生します。私の質問は次のとおりです: - EMR を使用してファイルを 2 つ以上のファイルに分割することは可能ですか? - もしそうなら、S3 が動的に新しいファイルを作成することを妨げているので、EMR ジョブは失敗していますか? - または、コードの動作が間違っていますか?

すべてのフィードバックに感謝します。

ありがとうございました。

4

1 に答える 1

0

これを行おうとしている方法は機能しません。ジョブが成功したとしても、Hadoop クラスター内の各ノードのローカル ファイル システムにファイルを書き込むことができただけです。ほとんどの場合、これらのファイルはジョブが完了すると破棄されます。

奇妙なことに、マッパーは key\tvalue 構造体を発行しますが、リデューサーは特定のキーの値のコレクションに対して何もしていないようです。では、マップ出力を data[0] で分割する必要があるのか​​どうかは明らかではありません。(文脈が分からないのかもしれませんが)

可能であれば、これらはより良い代替手段になります:

  • 最初にマップのみのジョブを使用して、入力データを 2 つのデータ セット (mobile_inquiries と transactions) に分割します。Hive を使用する場合 - 単一のテーブルを選択し、述語 (Python コードの場合と同様) に基づいて (HDFS または S3 の) 2 つのディレクトリに挿入できます。
  • 入力が分割されたので、出力ごとに 1 つの map-reduce ジョブを実行します。これは、任意の map/reduce 関数を実行できます。FWIW - ここでエンコードされた map-reduce 関数は実際には Python を必要としません - 標準の Hive SQL で直接表現できます。
于 2013-04-24T19:43:28.027 に答える