基本的に次のことを行うPythonのプログラムがあります。
for j in xrange(200):
# 1) Compute a bunch of data
# 2) Write data to disk
1) 約 2 ~ 5 分かかり
ます 2) 約 1 分かかります
メモリに保持するにはデータが多すぎることに注意してください。
理想的には、CPU のアイドリングを回避する方法でデータをディスクに書き込むことです。これはPythonで可能ですか?ありがとう!
基本的に次のことを行うPythonのプログラムがあります。
for j in xrange(200):
# 1) Compute a bunch of data
# 2) Write data to disk
1) 約 2 ~ 5 分かかり
ます 2) 約 1 分かかります
メモリに保持するにはデータが多すぎることに注意してください。
理想的には、CPU のアイドリングを回避する方法でデータをディスクに書き込むことです。これはPythonで可能ですか?ありがとう!
次のような複数のプロセスを使用してみてください。
import multiprocessing as mp
def compute(j):
# compute a bunch of data
return data
def write(data):
# write data to disk
if __name__ == '__main__':
pool = mp.Pool()
for j in xrange(200):
pool.apply_async(compute, args=(j, ), callback=write)
pool.close()
pool.join()
pool = mp.Pool()
ワーカー プロセスのプールを作成します。デフォルトでは、ワーカーの数はマシンの CPU コアの数と同じです。
各pool.apply_async呼び出しは、ワーカー プロセスのプール内のワーカーによって実行されるタスクをキューに入れます。ワーカーが利用可能になると、それが実行されcompute(j)
ます。ワーカーが値 を返すdata
と、メイン プロセスのスレッドがコールバック関数write(data)
を実行data
し、ワーカーから返されたデータを使用します。
いくつかの注意事項:
j
0 から 199 の範囲に対応していない可能性があります。この問題を回避する 1 つの方法は、データj
のフィールドの 1 つとして sqlite (または他の種類の) データベースにデータを書き込むことです。 . 次に、データを順番に読みたい場合は、SELECT * FROM table ORDER BY j
.複数のプロセスを使用すると、ワーカー プロセスによってデータが生成され、ディスクへの書き込みを待機しているデータがキューに蓄積されるため、必要なメモリ量が増加します。NumPy 配列を使用すると、必要なメモリ量を削減できる場合があります。それが不可能な場合は、プロセスの数を減らす必要があります。
pool = mp.Pool(processes=1)
これにより、(実行する) 1 つのワーカー プロセスが作成されcompute
、メイン プロセスは実行されますwrite
。compute
よりも時間がかかるため
、write
キューはディスクに書き込まれる複数のデータ チャンクでバックアップされません。ただし、別のデータ チャンクをディスクに書き込みながら、1 つのデータ チャンクを計算するには十分なメモリが必要です。
両方を同時に実行するのに十分なメモリがない場合は、選択の余地がありません。元のコードcompute
をwrite
順番に実行するしかありません。
簡単な方法は、スレッドとキューだけを使用することです。一方、コンピューティング部分がグローバル状態に依存せず、複数の CPU コアを持つマシンがある場合、より効率的な方法はプロセス プールを使用することです。
from multiprocessing import Pool
def compute_data(x):
return some_calculation_with(x)
if __name__ == '__main__':
pool = Pool(processes=4) # let's say you have quad-core, so start 4 workers
with open("output_file","w") as outfile:
for calculation_result in pool.imap(compute_data, range(200)):
# pool.imap returns results as they come from process pool
outfile.write(calculation_result)