7

MapReduce、MRJobにYelpのPythonAPIを使用する方法を学ぼうとしています。彼らの単純な単語カウンターの例は理にかなっていますが、複数の入力を含むアプリケーションをどのように処理するのか興味があります。たとえば、ドキュメント内の単語を単に数えるのではなく、ベクトルに行列を掛けます。私はこの解決策を思いつきました。これは機能しますが、ばかげていると感じます。

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

このコードが実行./matrix.py < input.txtされ、それが機能する理由は、行列がinput.txtに列ごとに格納され、対応するベクトル値が行の終わりにあるためです。

したがって、次の行列とベクトル:

ここに画像の説明を入力してください

input.txtとして次のように表されます。

ここに画像の説明を入力してください

要するに、マトリックスとベクトルをより自然に別々のファイルに保存し、両方をMRJobに渡すにはどうすればよいでしょうか。

4

5 に答える 5

3

別の(または同じrow_i、row_j)データセットに対して生データを処理する必要がある場合は、次のいずれかを実行できます。

1)データのコピーを保存するためのS3バケットを作成します。このコピーの場所をタスククラスに渡します(例:以下のコードのself.options.bucketとself.options.my_datafile_copy_location)。警告:残念ながら、ファイル全体を処理する前にタスクマシンに「ダウンロード」する必要があるようです。接続が失敗したり、ロードに時間がかかりすぎる場合、このジョブは失敗する可能性があります。これを行うためのPython/MRJobコードを次に示します。

これをマッパー関数に入れます。

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2)SimpleDBドメインを作成し、そこにすべてのデータを保存します。botoとSimpleDBについてはこちらをご覧ください: http ://code.google.com/p/boto/wiki/SimpleDbIntro

マッパーコードは次のようになります。

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

この2番目のオプションは、大量のデータがある場合にパフォーマンスが向上する可能性があります。これは、一度に全量ではなく、データの各行に対して要求を行うことができるためです。SimpleDBの値の長さは最大1024文字であるため、データ値がそれより長い場合は、何らかの方法で圧縮/解凍する必要がある場合があることに注意してください。

于 2012-06-12T20:01:32.013 に答える
2

あなたの質問に対する実際の答えは、mrjobはhadoopストリーミング結合パターンをまだ完全にはサポートしていないということです。これは、map_input_file環境変数(map.input.fileプロパティを公開)を読み取って、処理しているファイルのタイプを判別することです。そのパスおよび/または名前。

この記事に示されているように、データ自体を読み取るだけで、それが属するタイプを簡単に検出できる場合は、それを実行できる可能性があります。

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

しかし、それが常に可能であるとは限りません...

そうでなければmyjobは素晴らしく見えます、そして私は彼らが将来これのサポートを追加できることを望みます。それまでは、これは私にとってかなり大きな問題です。

于 2013-09-13T17:59:51.190 に答える
2

これは私が複数の入力を使用する方法であり、ファイル名に基づいてマッパーフェーズで適切な変更を行います。

ランナープログラム:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

MRJobクラス:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()
于 2014-12-24T20:54:31.987 に答える
1

私の理解では、例がローカルファイルでの実行を利用している場合でも、HadoopクラスターまたはAmazonのHadoopサービスを活用したい場合を除いて、MrJobを使用することはありません。

MrJobは、原則として「Hadoopストリーミング」を使用してジョブを送信します。

これは、Hadoopからファイルまたはフォルダーとして指定されたすべての入力がマッパーにストリーミングされ、その後の結果がレデューサーにストリーミングされることを意味します。すべてのマッパーは入力のスライスを取得し、すべての入力が概略的に同じであると見なして、各データスライスのキーと値を均一に解析および処理します。

この理解に基づいて、入力はマッパーと概略的に同じです。2つの異なるスケマティックデータを含めることができる唯一の方法は、マッパーがどちらがベクターデータでどちらがマトリックスデータであるかを理解できるように、それらを同じファイルにインターリーブすることです。

You are actually doing it already.

線が行列データまたはベクトルデータである場合は、指定子を使用することで、これを簡単に改善できます。ベクトルデータが表示されると、前述のマトリックスデータが適用されます。

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

しかし、あなたが言及したプロセスはうまく機能します。すべての回路図データを1つのファイルにまとめる必要があります。

ただし、これにはまだ問題があります。K、V map reduceは、完全なスキーマが1行に存在し、完全な単一の処理装置を含む場合に、より適切に機能します。

私の理解では、あなたはすでにそれを正しく行っていますが、Map-Reduceはこの種のデータに適したメカニズムではないと思います。誰かが私ができるよりもさらにこれを明らかにすることを願っています。

于 2012-06-12T20:39:37.657 に答える
1

MrJobFundumentalsは次のように述べています。

stdinと混合して(-文字を使用して)複数の入力ファイルを渡すことができます。

$ python my_job.py input1.txt input2.txt - < input3.txt
于 2020-04-15T12:03:17.923 に答える