私は 72 コア (実際には 36 個のマルチスレッド CPU で、72 コアとして表示されていますmultiprocessing.cpu_count()
) のワークステーションを持っています。
何百万もの小さなファイルのバッチで、同時処理の両方を試しmultiprocessing
ましray
たが、その処理中にいくつかの出力ファイルを同時に書きたいと思います。
たとえば(in .get()
)および.apply_async()
multiprocessing
ray.get()
には、ループ内でデータのグループを並列に処理ray
するリモート関数 ( ) があります。process_group()
以下では、モジュールを使用するコードのバージョンmultiprocessing
もコメントとして示します。
import ray
import pandas as pd
# from multiprocessing import Pool
ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
##-----------------------
## With ray :
df_list = ray.get([process_group.remote(data) for data in data_list])
##-----------------------
## With multiprocessing :
#f_list = pool.map(process_group, list_of_indices_into_data_list)
##
## data are both known from the parent process
## and I use copy-on-write semantic to avoid having 60 copies.
## All the function needs are a list of indices
## of where to fetch slices of the read-only data.
##
very_big_df = pd.concatenate(df_list)
##-----------------------
## Write to file :
very_big_df.to_parquet(outputfile)
したがって、各ループ反復では、1 つのより大きなデータフレームに連結するためprocess_group()
のデータフレームのリストとして、同時に計算された many の出力を収集する必要があります。後者はディスクに書き込む必要があります (通常、サイズは ~1 から ~3 GB です)。このようなファイルを 1 つ書き込むと、リモートが処理されるまでに時間がかかります。何千ものループ反復があります。そのため、完了するまでに数日かかります。df_list
very_big_df
10-30 [s]
180 [s]
process_group
時間の約 10% を節約するためにループを継続しながら、ファイルを非ブロッキング方式でディスクに書き込むことは可能ですか (計算時間を約 1 日節約できます)。
次のループ反復の並行プロセスが終了するまでに、前の反復からの出力を書き込むのに十分な時間があります。ここに含まれるコアはすべてほぼ 100% で動作しているように見えるため、このThreading
モジュールもおそらく推奨されません。multiprocessing.apply_async()
選択できない出力データフレームが必要ないため、さらにイライラしますvery_big_df
。これは、節約しようとしている時間を犠牲にする可能性のある、より洗練されたものと共有する必要があり、そのray
ようなものを効率的に処理することを望んでいました.
[更新] 簡単にするために、すべてのプロセスに大きな共有変数があることについては言及しませんでした (これが、ファイルの同時書き込みと同様に、並列プロセスと呼んだ理由です)。その結果、タイトルの質問が編集されました。実際には、レイ並列ジョブの前に次のコードがあります。
shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])
ただし、それが同時操作だけでなく、「並列」実行のようになるかどうかはわかりません。
[UPDATE 2] 共有配列はルックアップ テーブルです。つまり、並列ワーカーに関する限り、読み取り専用です。
[更新 3] 提案された両方の解決策を試しました: Threading と Ray / compute()ブロックインになる.get()を介して。
したがって、Ray では、これは両方のソリューションを示しています。
@ray.remote
def write_to_parquet(df_list, filename):
df = pd.concat(df_list)
df.to_parquet(filename, engine='pyarrow', compression=None)
# Share array created outside the loop, read-only (big lookup table).
# About 600 MB
shared_array_id = ray.put(shared_array)
for data_list in many_data_lists:
new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
write_to_parquet.remote(df_list, my_filename)
## Using threading, one would remove the ray decorator:
# write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
# write_thread.start()
RAY ソリューションの場合、これは object_store_memory を増やすために必要でしたが、デフォルトでは十分ではありませんでした: ノード メモリの 10% ~ 37 GB (私は 376 GB の RAM を持っています) で、20 GB で上限があり、保存されているオブジェクトのみが合計で約 22 GB です:データフレームの 1 つのリストdf_list
(約 11 GB)、および書き込み関数内でのそれらの連結の結果 (約 11 GB)。連結中にコピーがあると仮定します。そうでない場合、このメモリの問題は意味がなく、デフォルトで発生していると思っていた numpy ビューを渡すことができるかどうか疑問に思います。これは、それぞれがどれだけのメモリdf_list
になるかを実際に予測できないため、RAY のかなり苛立たしい側面です。1 倍から 3 倍まで変化する可能性があります...
最終multiprocessing
的に、処理部分 (I/O なし) が高速になるため、スレッド化に固執することが最も効率的なソリューションになります。
from multiprocessing import Pool
# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)
def process_group(my_data):
# Process a new dataframe here using my_data and some other data inside shared_array
...
return my_df
n_workers = 60
with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
# data_list contains thousands of elements. I choose a chunksize of 10
df_list = pool.map(process_group, data_list, 10)
write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
write_thread.start()
各ループ反復では、通常len(many_data_lists) = 7000
、各リストにはサイズ (3, 9092) の 7 つの numpy 配列が含まれます。したがって、これらの 7000 個のリストが 60 個のワーカーに送信されます。
process_group
ループ反復ごとのすべての並列の時間:
レイ:250 [s]
マルチプロセッシング:233 [s]
I/O: 5 GB の寄木細工ファイルが外部 USB 3 回転ディスクに書き込まれるのに約 35 秒かかります。内部回転ディスクで約 10 秒。
Raywrite_to_parquet.remote()
:ループをブロックする未来を作成するための最大 5 秒のオーバーヘッド。これは、回転するディスクに書き込むのにかかる時間の 50% です。これは理想的ではありません。
multiprocessing : 0 秒のオーバーヘッドが測定されました。
総経過時間:
レイ:486 [s]
マルチプロセッシング:436 [s]
これを数回繰り返しましたが、レイとマルチプロセッシングの違いは、一貫してマルチプロセッシングが最大50 秒高速であることを示しています。これは大きな違いであり、 Rayがより高い効率性を宣伝しているため、不可解でもあります。
これをより長い反復回数実行し、安定性 (メモリ、ガベージ コレクションの潜在的な問題など) について報告します。