1

私は基本的に、Hadoop でスケールアップしてレコメンダー システムを実装しようとしています。

最初のステップでは、入力ファイル内のアイテムのすべてのペア間の類似性を計算しようとしています。

{アイテムA、アイテムB、類似性}

出力ファイルのサイズが非常に大きくなります (60kb の入力の場合、出力ファイルのサイズは 6mb になります)。

したがって、結果を python dict に保存し、map reduce プログラム全体の終了後に dict を 1 回だけ出力する方がよいのではないかと考えました。

私のpythonコードは次のとおりです。

#!/usr/bin/env python
from mrjob.job import MRJob
from math import sqrt

from itertools import combinations

PRIOR_COUNT = 10

PRIOR_CORRELATION = 0

    prefs={}

    def correlation(size, dot_product, rating_sum, \
        rating2sum, rating_norm_squared, rating2_norm_squared):
'''
  The correlation between two vectors A, B is
      [n * dotProduct(A, B) - sum(A) * sum(B)] /
    sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }

'''
numerator = size * dot_product - rating_sum * rating2sum
denominator = sqrt(size * rating_norm_squared - rating_sum * rating_sum) * \
                sqrt(size * rating2_norm_squared - rating2sum * rating2sum)

return (numerator / (float(denominator))) if denominator else 0.0


def regularized_correlation(size, dot_product, rating_sum, \
        rating2sum, rating_norm_squared, rating2_norm_squared,
        virtual_cont, prior_correlation):
    '''
    The Regularized Correlation between two vectors A, B

    RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
        where w = # actualPairs / (# actualPairs + # virtualPairs).
    '''
    unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
            rating2sum, rating_norm_squared, rating2_norm_squared)

    w = size / float(size + virtual_cont)

    return w * unregularizedCorrelation + (1.0 - w) * prior_correlation

class SemicolonValueProtocol(object):

  # don't need to implement read() since we aren't using it

  def write(self, key, values):
      return ';'.join(str(v) for v in values)

class BooksSimilarities(MRJob):

#OUTPUT_PROTOCOL = SemicolonValueProtocol

def steps(self):
    return [
        self.mr(mapper=self.group_by_user_rating,
                reducer=self.count_ratings_users_freq),
        self.mr(mapper=self.pairwise_items,
                reducer=self.calculate_similarity),
        self.mr(mapper=self.calculate_ranking,
                reducer=self.top_similar_items)]

def group_by_user_rating(self, key, line):
    '''
    Emit the user_id and group by their ratings (item and rating)

    17  70,3
    35  21,1
    49  19,2
    49  21,1
    49  70,4
    87  19,1
    87  21,2
    98  19,2

    '''
    line=line.replace("\"","");
    user_id, item_id, rating = line.split(',')

    yield  user_id, (item_id, float(rating))

def count_ratings_users_freq(self, user_id, values):
    '''
    For each user, emit a row containing their "postings"
    (item,rating pairs)
    Also emit user rating sum and count for use later steps.

    17    1,3,(70,3)
    35    1,1,(21,1)
    49    3,7,(19,2 21,1 70,4)
    87    2,3,(19,1 21,2)
    98    1,2,(19,2)

    '''
    item_count = 0
    item_sum = 0
    final = []
    for item_id, rating in values:
        item_count += 1
        item_sum += rating
        final.append((item_id, rating))

    yield user_id, (item_count, item_sum, final)

def pairwise_items(self, user_id, values):
    '''
    The output drops the user from the key entirely, instead it emits
    the pair of items as the key:

    19,21  2,1
    19,70  2,4
    21,70  1,4
    19,21  1,2

    '''
    item_count, item_sum, ratings = values
    for item1, item2 in combinations(ratings, 2):
        yield (item1[0], item2[0]), (item1[1], item2[1])

def calculate_similarity(self, pair_key, lines):
    '''
    Sum components of each corating pair across all users who rated both
    item x and item y, then calculate pairwise pearson similarity and
    corating counts.  The similarities are normalized to the [0,1] scale
    because we do a numerical sort.

    19,21   0.4,2
    21,19   0.4,2
    19,70   0.6,1
    70,19   0.6,1
    21,70   0.1,1
    70,21   0.1,1

    '''
    sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
    item_pair, co_ratings = pair_key, lines
    item_xname, item_yname = item_pair
    for item_x, item_y in lines:
        sum_xy += item_x * item_y
        sum_y += item_y
        sum_x += item_x
        sum_xx += item_x * item_x
        sum_yy += item_y * item_y
        n += 1

    reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
            sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)

    yield (item_xname, item_yname), (reg_corr_sim, n)


def calculate_ranking(self, item_keys, values):
    '''
    Emit items with similarity in key for ranking:

    19,0.4    70,1
    19,0.6    21,2
    21,0.6    19,2
    21,0.9    70,1
    70,0.4    19,1
    70,0.9    21,1

    '''
    reg_corr_sim, n = values
    item_x, item_y = item_keys
    if int(n) > 0:
        yield (item_x, reg_corr_sim),(item_y, n)

def top_similar_items(self, key_sim, similar_ns):
    '''
    For each item emit K closest items in comma separated file:

    De La Soul;A Tribe Called Quest;0.6;1
    De La Soul;2Pac;0.4;2

    '''
    item_x, reg_corr_sim = key_sim
    for item_y, n in similar_ns:
           #yield None, (item_x, item_y, reg_corr_sim, n)
       prefs.setdefault(item_x,{})
       prefs[item_x][item_y] = float(reg_corr_sim)
       prefs.setdefault(item_y,{})
       prefs[item_y][item_x] = float(reg_corr_sim) 
    print "exiting"

if __name__ == '__main__':
   BooksSimilarities.run()

だから私は実行後に欲しいもの

python thisfile.py < input.csv -r hadoop > output.txt

繰り返しがなく、辞書が 1 つある比較的小さな出力ファイルです。

要するに、

現在、このプログラムは終了をn 回印刷しますが、一度だけ印刷したいのです。

これらすべてとは別に、より良い方法で Hadoop をスケールアップすることにより、協調フィルタリングを実装するより良い方法があります。

事前に感謝します。

4

1 に答える 1

0

同じキーを持つ値が同じレデューサーに送られるという保証しかありません。したがって、クラスターで複数のレデューサーを実行している場合、作業は分割され、レデューサーが実行されてすべてのキーでタスクを完了するため、多くの「終了」が発生します。

ローカルで実行して、動作するかどうかを検証してみてください:python thisfile.py <input.csv> output.txt

たぶん、steps()で「reducer_final」を定義して、最後のステップのレデューサー出力をすべて取得し、必要に応じて管理することができます。

チェック: http: //pythonhosted.org/mrjob/job.html#mrjob.job.MRJob.steps

敬具、

于 2013-03-05T14:37:47.083 に答える