長すぎる; 読んでいない
warnings.catch_warnings()
コンテキスト マネージャはスレッド セーフではありません。並列処理環境で使用するにはどうすればよいですか?
バックグラウンド
multiprocessing
以下のコードは、Python のモジュールで並列処理を使用して最大化問題を解決します。(不変の) ウィジェットのリストを取得し、それらをパーティション分割し ( Python 3 での大規模でブルート フォースの最大化の効率的なマルチプロセッシングを参照)、すべてのパーティションの最大値 (「ファイナリスト」) を見つけてから、最大値 (「チャンピオン」) を見つけます。 )それらの「ファイナリスト」の。私が自分のコードを正しく理解していれば (もし理解していたらここにいないでしょう)、すべての子プロセスとメモリを共有して入力ウィジェットを提供しmultiprocessing
、オペレーティング システム レベルのパイプとピクルを使用して送信します。ワーカーが完了すると、ファイナリスト ウィジェットがメイン プロセスに戻ります。
問題の原因
ウィジェットがプロセス間パイプから出てくるときに発生するunpickle 後のウィジェットの再インスタンス化によって引き起こされる冗長なウィジェット警告をキャッチしたいと思います。ウィジェット オブジェクトがインスタンス化されると、ウィジェット オブジェクトは自身のデータを検証し、Python 標準warnings
モジュールから警告を発行して、ユーザーの入力データに問題があるとウィジェットが疑っていることをアプリのユーザーに伝えます。unpickle はオブジェクトをインスタンス化するため、コードの私の理解では、パイプから出た後にファイナリストである場合にのみ、各ウィジェット オブジェクトが 1 回だけ再インスタンス化されることを意味します。これが正しくない理由については、次のセクションを参照してください。 .
ウィジェットは、フロブニカ化される前にすでに作成されているため、ユーザーは自分が間違った入力を痛感していて、それについて二度と聞きたくない. warnings
これらは、モジュールのcatch_warnings()
コンテキスト マネージャー (with
ステートメント)でキャッチしたい警告です。
失敗したソリューション
私のテストでは、以下でLine AとLine Bとしてラベルを付けたものの間のどこかに余分な警告が発せられているときを絞り込みました。驚いたのは、すぐ近く以外の場所で警告が発せられていることoutput_queue.get()
です。これmultiprocessing
は、ピクルスを使用してウィジェットをワーカーに送信することを意味します。
要するに、 Line AからLine Bwarnings.catch_warnings()
までのすべてに均等に作成されたコンテキスト マネージャーを配置し、このコンテキスト内に適切な警告フィルターを設定しても、警告がキャッチされないということです。これは、ワーカー プロセスで警告が発行されていることを意味します。このコンテキスト マネージャーをワーカー コードの周りに置いても、警告はキャッチされません。
コード
この例では、問題のサイズが小さすぎてプロセスのフォーク、マルチプロセッシングのインポート、および と の定義に煩わされないかどうかを判断するためのコードが省略されていmy_frobnal_counter
ますmy_load_balancer
。
"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"
def frobnicate_parallel_worker(widgets, output_queue):
resultant_widget = max(widgets, key=my_frobnal_counter)
output_queue.put(resultant_widget)
def frobnicate_parallel(widgets):
output_queue = multiprocessing.Queue()
# partitions: Generator yielding tuples of sets
partitions = my_load_balancer(widgets)
processes = []
# Line A: Possible start of where the warnings are coming from.
for partition in partitions:
p = multiprocessing.Process(
target=frobnicate_parallel_worker,
args=(partition, output_queue))
processes.append(p)
p.start()
finalists = []
for p in processes:
finalists.append(output_queue.get())
# Avoid deadlocks in Unix by draining queue before joining processes
for p in processes:
p.join()
# Line B: Warnings no longer possible after here.
return max(finalists, key=my_frobnal_counter)