3

dataframeカスタムマネージャーを介してパンダに出力を追加するワーカーのプールを設定する、比較的単純なpythonマルチプロセッシングスクリプトがあります。私が見つけたのは、プールでclose()/ join()を呼び出したときに、apply_asyncによって送信されたすべてのタスクが完了しているわけではないということです。

これは、1000個のジョブを送信するが、半分しか完了していないためにアサーションエラーが発生する単純化された例です。非常に単純なものを見落としたことがありますか、それともこれはおそらくバグですか?

from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool

class DataFrameResults:
    def __init__(self):
        self.results = DataFrame(columns=("A", "B")) 

    def get_count(self):
        return self.results["A"].count()

    def register_result(self, a, b):
        self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)

class MyManager(BaseManager): pass

MyManager.register('DataFrameResults', DataFrameResults)

def f1(results, a, b):
    results.register_result(a, b)

def main():
    manager = MyManager()
    manager.start()
    results = manager.DataFrameResults()

    pool = Pool(processes=4)

    for (i) in range(0, 1000):
        pool.apply_async(f1, [results, i, i*i])
    pool.close()
    pool.join()

    print results.get_count()
    assert results.get_count() == 1000

if __name__ == "__main__":
    main()
4

1 に答える 1

3

[編集]あなたが見ている問題は、このコードが原因です:

self.results = self.results.append(...)

これはアトミックではありません。そのため、場合によっては、読み取り後self.results(または追加中)にスレッドが中断されますが、新しいフレームをself.results->に割り当てる前に、このインスタンスは失われます。

正しい解決策は、結果オブジェクトを使用して結果を取得するのを待ってから、それらすべてをメインスレッドに追加することです。

于 2012-08-15T14:26:08.017 に答える