5

私は、400 万の会社名の干し草の山から 30,000 個の針を探すファジー文字列マッチング スクリプトを持っています。スクリプトは問題なく動作しますが、AWS h1.xlarge での並列処理による高速化の試みは、メモリが不足しているため失敗しました。

以前の質問に答えて説明したように、より多くのメモリを取得しようとするのではなく、ワークフローを最適化する方法を見つけたいと思います - 私はこれにかなり慣れていないので、十分な余地があるはずです. ところで、私はすでにキューを試しました(これも機能しましたが、同じMemoryError.

コードの中で最も関連性が高いと思われるものを次に示します。ロジックが十分に明確になることを願っています-必要に応じてさらに情報を提供してください:

def getHayStack():
    ## loads a few million company names into id: name dict
    return hayCompanies

def getNeedles(*args):
    ## loads subset of 30K companies into id: name dict (for allocation to workers)
    return needleCompanies

def findNeedle(needle, haystack):
    """ Identify best match and return results with score """
    results = {}
    for hayID, hayCompany in haystack.iteritems():
        if not isnull(haystack[hayID]):
            results[hayID] = levi.setratio(needle.split(' '), 
                                           hayCompany.split(' '))
    scores = list(results.values())
    resultIDs = list(results.keys())
    needleID = resultIDs[scores.index(max(scores))]
    return [needleID, haystack[needleID], max(scores)]

def runMatch(args):
    """ Execute findNeedle and process results for poolWorker batch"""
    batch, first = args
    last = first + batch
    hayCompanies = getHayStack()
    needleCompanies = getTargets(first, last)
    needles = defaultdict(list)
    current = first
    for needleID, needleCompany in needleCompanies.iteritems():
        current += 1
        needles[targetID] = findNeedle(needleCompany, hayCompanies)
    ## Then store results

if __name__ == '__main__':
    pool = Pool(processes = numProcesses)
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    pool.map_async(runMatch, 
                   itertools.izip(itertools.repeat(targetsPerBatch),
                                  xrange(0, 
                                         totalTargets,
                                         targetsPerBatch))).get(99999999)
    pool.close()
    pool.join()

質問は次のとおりだと思います: どうすればすべてのワーカーの干し草の山をロードしないようにできますか? たとえば、データを共有したり、はるかに大きな干し草の山を針ではなくワーカー間で分割するなどの別のアプローチを採用したりできますか? クラッタを回避または排除してメモリ使用量を改善するにはどうすればよいですか?

4

1 に答える 1

4

あなたのデザインは少し混乱しています。N 個のワーカーのプールを使用しており、M 個のジョブをサイズ M/N の N 個のタスクに分割します。言い換えれば、これがすべて正しければ、ワーカー プロセスの上に構築されたプールの上でワーカー プロセスをシミュレートしていることになります。なぜそれを気にするのですか?プロセスを使用したい場合は、それらを直接使用してください。または、プールをプールとして使用し、各ジョブを独自のタスクとして送信し、バッチ機能を使用して適切な (そして微調整可能な) 方法でそれらをバッチ処理します。

つまりrunMatch、needleID と needleCompany を 1 つだけ取り、それを呼び出して、その部分がfindNeedle何であれ実行するだけです。# Then store resultsそして、メインプログラムはずっと単純になります:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        results = pool.map_async(runMatch, needleCompanies.iteritems(), 
                                 chunkSize=NUMBER_TWEAKED_IN_TESTING).get()

または、結果が小さい場合は、すべてのプロセスが (おそらく) 結果を保存する共有のものをめぐって争うのではなく、それらを返すだけです。次に、まったく必要ありませんrunMatch

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        for result in pool.imap_unordered(findNeedle, needleCompanies.iteritems(), 
                                          chunkSize=NUMBER_TWEAKED_IN_TESTING):
            # Store result

または、正確に N 個のバッチを実行したい場合は、それぞれに対して Process を作成します。

if __name__ == '__main__':
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    processes = [Process(target=runMatch, 
                         args=(targetsPerBatch,
                               xrange(0, 
                                      totalTargets,
                                      targetsPerBatch))) 
                 for _ in range(numProcesses)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

getHayStack()また、タスクごとに 1 回 (およびgetNeedles同様に)呼び出しているようです。このライブの複数のコピーを同時に作成することがどれほど簡単かはわかりませんが、これまでで最大のデータ構造であることを考えると、それを最初に除外しようとします. 実際、メモリ使用量の問題でなくても、getHayStack何らかのキャッシュを既に行っていない限り (たとえば、最初にグローバルまたは変更可能なデフォルト パラメータ値に明示的に格納するなど)、パフォーマンスに大きな打撃を与える可能性があります。そしてそれを使用するだけです)、とにかく修正する価値があるかもしれません.

両方の潜在的な問題を一度に修正する 1 つの方法は、Poolコンストラクターで初期化子を使用することです。

def initPool():
    global _haystack
    _haystack = getHayStack()

def runMatch(args):
    global _haystack
    # ...
    hayCompanies = _haystack
    # ...

if __name__ == '__main__':
    pool = Pool(processes=numProcesses, initializer=initPool)
    # ...

次に、実際には必要のない複数の場所でリストを明示的に生成していることに気付きました。例えば:

scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]

結果が一握り以上ある場合、これは無駄です。results.values()iterable を直接使用するだけです。(実際、Python 2.x を使用しているように見えます。この場合keys、 とvalues既にリストになっているため、正当な理由もなく追加のコピーを作成しているだけです。)

しかし、この場合、全体をさらに単純化できます。最高スコアのキー (resultID) と値 (スコア) を探しているだけですよね? そう:

needleID, score = max(results.items(), key=operator.itemgetter(1))
return [needleID, haystack[needleID], score]

これにより、 で繰り返されるすべての検索も排除されscore、CPU がいくらか節約されます。


これでメモリの問題が直接解決されるわけではありませんが、デバッグや微調整が容易になるはずです。

最初に試すことは、input_size/cpu_count の代わりに、はるかに小さいバッチを使用することです。1 を試してください。メモリ使用量は減りますか? そうでない場合は、その部分を除外しました。

次に、sys.getsizeof(_haystack)それが何を言っているかを試してみてください。たとえば、1.6GB の場合、他のすべてを 0.4GB に詰め込もうとしてかなり細かく切り詰めているので、それが攻撃方法です。たとえば、shelveプレーンな の代わりにデータベースを使用しますdict

また、初期化関数の開始時と終了時にメモリ使用量を (resourceモジュールで)ダンプしてみてください。getrusage(RUSAGE_SELF)最終的な干し草の山がたとえば 0.3GB しかないのに、さらに 1.3GB を割り当ててそれを構築する場合、それが攻撃の問題です。たとえば、単一の子プロセスをスピンオフして dict を構築およびピクルし、プール初期化子にそれを開いてピクル解除させることができます。または、2 つを組み合わせて、最初の子でデータベースを構築shelveし、初期化子で読み取り専用で開きます。いずれにせよ、これは、CSV 解析/辞書作成作業を 8 回ではなく 1 回だけ行うことも意味します。

一方、VM の合計使用率がまだ低い場合 ( getrusageVM の合計サイズを直接確認する方法がないことに注意してください。<code>ru_maxrss は、多くの場合、特にru_nswap0 の場合に有用な概算値です)、最初のタスクの時点で問題はタスク自体にあります。

まず、getsizeofタスク関数への引数と戻り値。それらが大きい場合、特にタスクごとに大きくなり続けるか、大きく変化する場合は、データのピクルとアンピクルがメモリを大量に消費し、最終的にそれらの 8 つが一緒になって限界に達するほど大きくなる可能性があります。

それ以外の場合、問題はタスク関数自体にある可能性が最も高くなります。メモリ リークが発生したか (バグのある C 拡張モジュールを使用した場合にのみ実際のctypesリークが発生する可能性があります)、または呼び出し間で参照を保持している場合、たとえばグローバルで、不必要に物事を永久に保持している可能性があります。 )、または一部のタスク自体が大量のメモリを消費しています。いずれにせよ、これは、マルチプロセッシングを引き出してタスクを直接実行するだけで、より簡単にテストできるはずです。これにより、デバッグがはるかに簡単になります。

于 2013-09-10T00:29:08.343 に答える