37

長すぎる; 読んでいない

warnings.catch_warnings()コンテキスト マネージャはスレッド セーフではありません。並列処理環境で使用するにはどうすればよいですか?

バックグラウンド

multiprocessing以下のコードは、Python のモジュールで並列処理を使用して最大化問題を解決します。(不変の) ウィジェットのリストを取得し、それらをパーティション分割し ( Python 3 での大規模でブルート フォースの最大化の効率的なマルチプロセッシングを参照)、すべてのパーティションの最大値 (「ファイナリスト」) を見つけてから、最大値 (「チャンピオン」) を見つけます。 )それらの「ファイナリスト」の。私が自分のコードを正しく理解していれば (もし理解していたらここにいないでしょう)、すべての子プロセスとメモリを共有して入力ウィジェットを提供しmultiprocessing、オペレーティング システム レベルのパイプとピクルを使用して送信します。ワーカーが完了すると、ファイナリスト ウィジェットがメイン プロセスに戻ります。

問題の原因

ウィジェットがプロセス間パイプから出てくるときに発生するunpickle 後のウィジェットの再インスタンス化によって引き起こされる冗長なウィジェット警告をキャッチしたいと思います。ウィジェット オブジェクトがインスタンス化されると、ウィジェット オブジェクトは自身のデータを検証し、Python 標準warningsモジュールから警告を発行して、ユーザーの入力データに問題があるとウィジェットが疑っていることをアプリのユーザーに伝えます。unpickle はオブジェクトをインスタンス化するため、コードの私の理解では、パイプから出た後にファイナリストである場合にのみ、各ウィジェット オブジェクトが 1 回だけ再インスタンス化されることを意味します。これが正しくない理由については、次のセクションを参照してください。 .

ウィジェットは、フロブニカ化される前にすでに作成されているため、ユーザーは自分が間違った入力を痛感していて、それについて二度と聞きたくない. warningsこれらは、モジュールのcatch_warnings()コンテキスト マネージャー (withステートメント)でキャッチしたい警告です。

失敗したソリューション

私のテストでは、以下でLine ALine Bとしてラベルを付けたものの間のどこかに余分な警告が発せられているときを絞り込みました。驚いたのは、すぐ近く以外の場所で警告が発せられていることoutput_queue.get()です。これmultiprocessingは、ピクルスを使用してウィジェットをワーカーに送信することを意味します。

要するに、 Line AからLine Bwarnings.catch_warnings()までのすべてに均等に作成されたコンテキスト マネージャーを配置し、このコンテキスト内に適切な警告フィルターを設定しても、警告がキャッチされないということです。これは、ワーカー プロセスで警告が発行されていることを意味します。このコンテキスト マネージャーをワーカー コードの周りに置いても、警告はキャッチされません。

コード

この例では、問題のサイズが小さすぎてプロセスのフォーク、マルチプロセッシングのインポート、および と の定義に煩わされないかどうかを判断するためのコードが省略されていmy_frobnal_counterますmy_load_balancer

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)
4

3 に答える 3

2

__init__選択を解除しても、が2回実行されることはありません。Windowsで次のコードを実行しましたが、発生しません(それぞれ__init__が正確に1回実行されます)。

my_load_balancerしたがって、ウィジェットのクラスとの間のコードを提供する必要があります。この時点で、あなたの質問は単に十分な情報を提供していません。

ランダムな推測として、my_load_balancerウィジェットのコピーを作成して、ウィジェットを再度インスタンス化するかどうかを確認できます。

import multiprocessing
import collections

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def my_load_balancer(widgets):
    partitions = tuple(set() for _ in range(8))
    for i, widget in enumerate(widgets):
        partitions[i % 8].add(widget)
    for partition in partitions:
        yield partition

def my_frobnal_counter(widget):
    return widget.id

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

class Widget:
    id = 0
    def __init__(self):
        print('initializing Widget {}'.format(self.id))
        self.id = Widget.id
        Widget.id += 1

    def __str__(self):
        return str(self.id)

    def __repr__(self):
        return str(self)

def main():

    widgets = [Widget() for _ in range(16)]
    result = frobnicate_parallel(widgets)
    print(result.id)


if __name__ == '__main__':
    main()
于 2012-10-12T06:08:34.417 に答える
2

Process.run使用するメソッドをオーバーライドしてみることができますwarnings.catch_warnings

>>> from multiprocessing import Process
>>> 
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
... 
>>> class CustomProcess(Process):
...    def run(self, *args, **kwargs):
...       import warnings
...       with warnings.catch_warnings():
...          warnings.simplefilter("ignore")
...          return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    quiet = CustomProcess(target=yell, args=('...not!',))
...    quiet.start()
...    quiet.join()
...    noisy = Process(target=yell, args=('AAAAAAaaa!',))
...    noisy.start()
...    noisy.join()
... 
about to yell ...not!
about to yell AAAAAAaaa!
__main__:4: UserWarning: AAAAAAaaa!
>>> 

または、内部の一部を使用できます... ( __warningregistry__)

>>> from multiprocessing import Process
>>> import exceptions
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
...    # not filtered
...    warnings.warn('complimentary second warning.')
... 
>>> WARNING_TEXT = 'AAAAaaaaa!'
>>> WARNING_TYPE = exceptions.UserWarning
>>> WARNING_LINE = 4
>>> 
>>> class SelectiveProcess(Process):
...    def run(self, *args, **kwargs):
...       registry = globals().setdefault('__warningregistry__', {})
...       registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True
...       return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))
...    p.start()
...    p.join()
... 
about to yell AAAAaaaaa!
__main__:6: UserWarning: complimentary second warning.
>>> 
于 2012-10-12T04:53:01.677 に答える