19

モジュールと並列化したい「恥ずかしいほど並列な」プロジェクトがたくさんありmultiprocessingます。ただし、多くの場合、巨大なファイル (2 GB を超える) の読み取り、行ごとの処理、基本的な計算の実行、および結果の書き込みが必要になります。ファイルを分割し、Python の multiprocessing モジュールを使用して処理する最良の方法は何ですか? 使用する必要がありますQueueか?それともモジュール自体?または、を使用してプロセスのプールに反復可能なファイルをマップする必要がありますか? これらのアプローチを試してみましたが、行ごとにデータを分散させるとオーバーヘッドが膨大になります。私は、最初のプロセスの特定の割合を通過するを使用して、軽量のパイプ フィルター設計に落ち着きました。JoinableQueuemultiprocessingQueuemultiprocessingcat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2)、しかし、Pythonに完全に含まれるソリューションが欲しい.

驚いたことに、Python のドキュメントでは、これを行う標準的な方法は提案されていません (multiprocessingドキュメントのプログラミング ガイドラインに関する長いセクションにもかかわらず)。

ありがとう、ヴィンス

追加情報: 1 行あたりの処理時間は異なります。高速で I/O バウンドがほとんどない問題もあれば、CPU バウンドの問題もあります。CPU バウンドで非依存のタスクは、並列化からポストを獲得するため、データを処理関数に割り当てる非効率的な方法であっても、ウォール クロック時間に関しては依然として有益です。

典型的な例は、行からフィールドを抽出し、さまざまなビット単位のフラグをチェックし、特定のフラグを持つ行をまったく新しい形式で新しいファイルに書き込むスクリプトです。これは I/O バウンドの問題のように思えますが、パイプを使用した安価な並行バージョンで実行したところ、約 20% 高速でした。プールとマップ、またはキューで実行すると、multiprocessing常に100%以上遅くなります。

4

7 に答える 7

9

最高のアーキテクチャの 1 つは、すでに Linux OS の一部です。特別なライブラリは必要ありません。

「ファンアウト」設計が必要です。

  1. 「メイン」プログラムは、パイプで接続された多数のサブプロセスを作成します。

  2. メイン プログラムはファイルを読み取り、パイプに行を書き込み、行を適切なサブプロセスに処理するために必要な最小限のフィルタリングを行います。

各サブプロセスは、おそらく stdin から読み書きする個別のプロセスのパイプラインである必要があります。

キュー データ構造は必要ありません。これがまさにインメモリ パイプラインです。つまり、2 つの並行プロセス間のバイトのキューです。

于 2009-12-01T01:45:05.833 に答える
6

1 つの戦略は、各ワーカーにオフセットを割り当てることです。そのため、8 つのワーカー プロセスがある場合は、0 から 7 の番号を割り当てます。ワーカー番号 0 は最初のレコード プロセスを読み取り、7 をスキップして 8 番目のレコードを処理します。ワーカー番号 1 などです。 2 番目のレコードを読み取り、7 をスキップして 9 番目のレコードを処理します。

このスキームには多くの利点があります。ファイルがどれほど大きくても、作業は常に均等に分割されます。同じマシン上のプロセスはほぼ同じ速度で処理され、同じバッファー領域を使用するため、過度の I/O オーバーヘッドが発生することはありません。ファイルが更新されていない限り、個々のスレッドを再実行して障害から回復できます。

于 2009-12-01T01:18:39.100 に答える
4

行の処理方法については言及していません。おそらく最も重要な情報です。

各行は独立していますか? 計算は、次の行の前にある行に依存していますか? ブロックで処理する必要がありますか? 各行の処理にはどのくらいの時間がかかりますか? 最後に「すべて」のデータを組み込む必要がある処理ステップはありますか? それとも、中間結果を捨てて現在の合計だけを維持することはできますか? ファイルサイズをスレッド数で割ることによって、ファイルを最初に分割できますか? それとも、処理するにつれて成長しますか?

行が独立していてファイルが大きくならない場合、必要な唯一の調整は、各ワーカーに「開始アドレス」と「長さ」をファームアウトすることです。それらは独立してファイルを開いてシークすることができ、結果を調整するだけで済みます。おそらく、N 個の結果がキューに返されるのを待つことによって。

行が独立していない場合、答えはファイルの構造に大きく依存します。

于 2009-12-01T00:28:40.277 に答える
1

Fredrik Lundh のSome Notes on Tim Bray's Wide Finder Benchmarkは興味深い読み物であり、非常によく似た使用例について、多くの適切なアドバイスがあります。他のさまざまな作成者も同じことを実装しており、一部は記事からリンクされていますが、「python wide finder」などをグーグルで検索してみてください。(モジュールに基づくソリューションもどこかにありましたmultiprocessingが、それはもう利用できないようです)

于 2009-12-01T11:03:09.787 に答える
1

ファイルの形式に大きく依存します。

それをどこかで分割するのは理にかなっていますか?それとも、新しい行で分割する必要がありますか? それとも、オブジェクト定義の最後で分割する必要がありますか?

ファイルを分割する代わりに、同じファイルに対して複数のリーダーを使用し、 を使用os.lseekしてファイルの適切な部分にジャンプする必要があります。

更新: ポスターは、彼が新しい行で分割したいと追加しました。次に、次のことを提案します。

4 つのプロセスがあるとします。次に、簡単な解決策は、ファイルの 0%、25%、50%、75% まで os.lseek を実行し、最初の新しい行に到達するまでバイトを読み取ることです。それが各プロセスの出発点です。これを行うためにファイルを分割する必要はありません。各プロセスで大きなファイル内の適切な場所を探し、そこから読み取りを開始するだけです。

于 2009-12-01T00:28:31.010 に答える
1

Python について具体的に質問されたことは承知していますが、Hadoop ( http://hadoop.apache.org/ ) を見ることをお勧めします。これは、この種の問題に対処するために特別に設計された Map and Reduce アルゴリズムを実装しています。

幸運を

于 2009-12-01T00:31:40.943 に答える
0

実行時間が長い場合は、各プロセスが を介して次の行を読み取るのではなくQueue、プロセスに行のバッチを読み取らせます。このようにして、オーバーヘッドは数行 (たとえば、数千以上) にわたって償却されます。

于 2009-12-01T01:11:36.697 に答える