2

python のmrjobを使用して、実行時間の長い python プログラムの一部を MapReduce Hadoop ジョブに変換することから始めています。簡単な単語カウントの例が機能するようになり、「テキスト分類」の例を概念的に理解しました。

ただし、問題を解決するために必要な手順を理解するのに少し苦労しています。

複数のファイル (約 6000) があり、それぞれに 2 ~ 800 行あります。この場合、各行はスペースで区切られた単純な「信号」です。各ファイルの各行と、すべてのファイル (それ自体を含む) の他のすべての行との相関関係を比較する必要があります。次に、相関係数に基づいて結果を出力します。

1 つのファイルの例:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

このファイルの各行を、他のすべてのファイルのすべての行とペアにする必要があります...または、すべてのファイルを1つのファイルに連結して簡単にすることもできますが、ペアごとの反復が必要です。

計算の方法と、最終的な削減ステップを使用して結果を集計およびフィルタリングする方法を理解しています。私が抱えている問題はyield、単一の setp ですべてのファイルを読み取らずに、すべてのペアワイズ アイテムを連続するステップに移動する方法です。を使用する入力ファイルを事前に準備できると思いますitertools.productが、このファイルは非常に大きくなります。

4

1 に答える 1

1

まあ、誰も答えを出していないので、他の誰かがそれを必要とする場合に備えて、現在の回避策を投稿します. これがどれほど「標準的」または効率的かはわかりませんが、これまでのところ機能しています。

ファイルの各行の最初の項目としてファイル名を配置し、\tその後に残りのデータが続きます。この例では、非常に簡単な例として、各行に 1 つの数値を使用し、それらを平均しています。

次に、 で次の map-reduce ステップを作成しましたmrjob

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
    """Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

    fnum, val = value.split('\t')
    yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

    for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
        yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

    fnum1, fnum2, val1, val2 = value
    res = (float(val1)+float(val2))/float(2)
    yield key, (fnum2, res)

def get_max_avg(self, key, values):

    max_fnum, max_avg = max(values, key = lambda x: x[1])
    yield key, (max_fnum, max_avg)

def steps(self):
    return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
                self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

このようにして、input_mapper関数からのすべての出力が同じinput_reducerものにグループ化され、それyieldが連続するペアになります。次に、これらは適切な場所に渡され、最終的に最大の平均が返されます (実際には、他のすべてのファイルで最大の項目です)。

それが誰かを助けることを願っています。

于 2011-07-10T23:44:24.157 に答える