MRjob を使用して、HBase インスタンスで Hadoop ストリーミング ジョブを実行しています。私の人生では、パラメーターをレデューサーに渡す方法がわかりません。ジョブの実行時にレデューサーに渡したい 2 つのパラメーターがあります: startDate と endDate です。現在のレデューサーは次のようになります。
def reducer(self, groupId, meterList):
"""
Print bucket.
"""
sys.stderr.write("Working on group = " + str(groupId) + "\n")
#print "Opening connection..."
conn = open_connection(hostname)
#print "Getting table..."
table = get_table(conn, tableName)
compositeDf = DataFrame()
for meterId in meterList:
sys.stderr.write("Querying: " + str(meterId) + "\n")
df = extract_meter_data(table, meterId, startDate, endDate)
startDate と endDate をパラメーターとしてレデューサーに渡すことができないようです。パラメーターを取得するジョブを取得できる唯一の方法は、クラスの上部にあるグローバル変数を使用することです。
startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)
class MRDataQuality(MRJob):
"""
MapReduce job that does a data quality check on the meter data in HBase.
"""
しかし、それは汚いです。ジョブの呼び出しから渡したい。私は多くの方法を試しました。インスタンス変数として設定し、静的クラス変数として設定し、MRDataQualityJob のオーバーロードされたコンストラクターを作成します....何も機能していないようです。次のように、トップレベルのスクリプトからプログラムで呼び出しています。
if args.hadoop:
mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
mrdq_job = MRDataQuality(args=[meterFile])
with mrdq_job.make_runner() as runner:
runner.run()
mrdq_job インスタンスに何をしても、 runner.run() は、インスタンスまたは静的変数が定義されていないクラスの新しい新しいインスタンスを使用しているようです。パラメーターをレデューサーに渡すにはどうすればよいですか???? 文字列「--reducer reducer.py arg1 arg2」を渡すことで、通常の Hadoop ストリーミングでそれを行うことができます。MRjobに相当するものはありますか?