3

MRjob を使用して、HBase インスタンスで Hadoop ストリーミング ジョブを実行しています。私の人生では、パラメーターをレデューサーに渡す方法がわかりません。ジョブの実行時にレデューサーに渡したい 2 つのパラメーターがあります: startDate と endDate です。現在のレデューサーは次のようになります。

def reducer(self, groupId, meterList):
    """
    Print bucket.
    """
    sys.stderr.write("Working on group = " + str(groupId) + "\n")
    #print "Opening connection..."
    conn = open_connection(hostname)
    #print "Getting table..."
    table = get_table(conn, tableName)

    compositeDf = DataFrame()

    for meterId in meterList:
        sys.stderr.write("Querying: " + str(meterId) + "\n")
        df = extract_meter_data(table, meterId, startDate, endDate)

startDate と endDate をパラメーターとしてレデューサーに渡すことができないようです。パラメーターを取得するジョブを取得できる唯一の方法は、クラスの上部にあるグローバル変数を使用することです。

startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)

class MRDataQuality(MRJob):
    """
    MapReduce job that does a data quality check on the meter data in HBase.
    """

しかし、それは汚いです。ジョブの呼び出しから渡したい。私は多くの方法を試しました。インスタンス変数として設定し、静的クラス変数として設定し、MRDataQualityJob のオーバーロードされたコンストラクターを作成します....何も機能していないようです。次のように、トップレベルのスクリプトからプログラムで呼び出しています。

if args.hadoop:
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
    mrdq_job = MRDataQuality(args=[meterFile])

with mrdq_job.make_runner() as runner:
    runner.run()

mrdq_job インスタンスに何をしても、 runner.run() は、インスタンスまたは静的変数が定義されていないクラスの新しい新しいインスタンスを使用しているようです。パラメーターをレデューサーに渡すにはどうすればよいですか???? 文字列「--reducer reducer.py arg1 arg2」を渡すことで、通常の Hadoop ストリーミングでそれを行うことができます。MRjobに相当するものはありますか?

4

2 に答える 2

3

パラメータをジョブ構成に渡してから、get_jobconf_value で読み取るのはどうですか?

このようなもの:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer(self, groupId, meterList):
    ...
    startDate = get_jobconf_value("my.job.settings.startdate")
    endDate = get_jobconf_value("my.job.settings.enddate")

    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, startDate, endDate)    

そして、上記のようにコードでパラメーターを設定します

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])
于 2013-08-23T15:40:41.163 に答える
1

パラメーターをジョブ構成に渡してから、reducer_init 内の get_jobconf_value で読み取るのはどうですか? この方法では、パラメータを 1 回だけ読み取る必要があります。

このようなもの:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer_init(self):
    ...
    self.startDate = get_jobconf_value("my.job.settings.startdate")
    self.endDate = get_jobconf_value("my.job.settings.enddate")

  def reducer(self, groupId, meterList):
    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, self.startDate, self.endDate)    

そして、上記のようにコードでパラメーターを設定します

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])
于 2013-10-22T19:16:05.620 に答える