dataframe
カスタムマネージャーを介してパンダに出力を追加するワーカーのプールを設定する、比較的単純なpythonマルチプロセッシングスクリプトがあります。私が見つけたのは、プールでclose()/ join()を呼び出したときに、apply_asyncによって送信されたすべてのタスクが完了しているわけではないということです。
これは、1000個のジョブを送信するが、半分しか完了していないためにアサーションエラーが発生する単純化された例です。非常に単純なものを見落としたことがありますか、それともこれはおそらくバグですか?
from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool
class DataFrameResults:
def __init__(self):
self.results = DataFrame(columns=("A", "B"))
def get_count(self):
return self.results["A"].count()
def register_result(self, a, b):
self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)
class MyManager(BaseManager): pass
MyManager.register('DataFrameResults', DataFrameResults)
def f1(results, a, b):
results.register_result(a, b)
def main():
manager = MyManager()
manager.start()
results = manager.DataFrameResults()
pool = Pool(processes=4)
for (i) in range(0, 1000):
pool.apply_async(f1, [results, i, i*i])
pool.close()
pool.join()
print results.get_count()
assert results.get_count() == 1000
if __name__ == "__main__":
main()