13

Python 2.7.3 を使用しています。サブクラス化multiprocessing.Processされたオブジェクトを使用していくつかのコードを並列化しました。サブクラス化された Process オブジェクトのコードにエラーがなければ、すべて正常に実行されます。しかし、サブクラス化された Process オブジェクトのコードにエラーがある場合、それらは明らかに静かにクラッシュし (スタックトレースが親シェルに出力されません)、CPU 使用率はゼロになります。親コードがクラッシュすることはなく、実行がハングしているように見えます。一方、エラーがどこにあるかについての指示がないため、コードのどこにエラーがあるかを追跡することは非常に困難です。

同じ問題を扱うスタックオーバーフローに関する他の質問は見つかりません。

サブクラス化された Process オブジェクトは、親のシェルにエラー メッセージを出力できないため、黙ってクラッシュしているように見えると思いますが、少なくともより効率的にデバッグできるように (そして他の私のコードのユーザーも、問題が発生したときに教えてくれます)。

編集:私の実際のコードは複雑すぎますが、エラーのあるサブクラス化された Process オブジェクトの簡単な例は次のようになります。

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)
4

3 に答える 3

18

本当に必要なのは、例外を親プロセスに渡す何らかの方法ですよね? その後、必要に応じてそれらを処理できます。

を使用する場合concurrent.futures.ProcessPoolExecutor、これは自動です。を使えばmultiprocessing.Pool簡単です。明示的なProcessandを使用する場合Queue、少し作業を行う必要がありますが、それほど多くはありません。

例えば:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put(result)
    except Exception as e:
        self.outputQueue.put(e)

次に、呼び出し元のコードは、Exception他のものと同じように、キューから s を読み取ることができます。これの代わりに:

yield outq.pop()

これを行う:

result = outq.pop()
if isinstance(result, Exception):
    raise result
yield result

(実際の親プロセスのキュー読み取りコードが何をするのかはわかりません。これは、最小限のサンプルがキューを無視するだけだからです。ただし、実際のコードが実際にはこのように機能しない場合でも、これでアイデアが説明されることを願っています。)

これは、未処理の例外がrun. 例外を戻し、次の に進みたい場合はi in iter、 を に移動するだけtryですfor

これも、Exceptions が有効な値ではないことを前提としています。(result, exception)それが問題である場合、最も簡単な解決策はタプルをプッシュすることです:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put((result, None))
    except Exception as e:
        self.outputQueue.put((None, e))

次に、ポップコードはこれを行います:

result, exception = outq.pop()
if exception:
    raise exception
yield result

(err, result)これは、すべてのコールバックに渡す node.js コールバック スタイルに似ていることに気付くかもしれません。はい、それは面倒です。そのスタイルでコードをめちゃくちゃにするつもりです。しかし、実際にはラッパー以外では使用していません。キューから値を取得したり、内部で呼び出されたりする「アプリケーションレベル」のコードはすべて、run通常のリターン/イールドと発生した例外を確認するだけです。

ジョブをキューに入れて手動で実行している場合でもFuture、 の仕様に合わせて を構築する (またはそのクラスをそのまま使用する) ことを検討することもできます。concurrent.futuresそれほど難しいことではなく、特にデバッグ用の非常に優れた API を提供します。

最後に、ワーカーとキューを中心に構築されたほとんどのコードは、キューごとに 1 つのワーカーのみが必要であると確信している場合でも、エグゼキューター/プール設計を使用してはるかに単純化できることに注意してください。ボイラープレートをすべて破棄し、Worker.runメソッド内のループを関数に変換します (キューに追加するのではなく、通常どおりreturns またはs を実行します)。raise呼び出し側では、すべてのボイラープレートと、ジョブ関数とそのパラメーターをもう一度破棄submitmapます。

例全体を次のように縮小できます。

def job(i):
    # (code that does stuff)
    1 / 0 # Dumb error
    # (more code that does stuff)
    return result

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(job, range(10))

また、例外を自動的に適切に処理します。


コメントで述べたように、例外のトレースバックは子プロセスにトレースバックしません。raise result手動呼び出し (または、プールまたはエグゼキューターを使用している場合は、プールまたはエグゼキューターの内臓)までしか進みません。

その理由は、multiprocessing.Queueが の上に構築されておりpickle、例外をピクルしてもトレースバックがピクルされないからです。その理由は、トレースバックをピクルできないからです。その理由は、トレースバックはローカル実行コンテキストへの参照でいっぱいであるため、別のプロセスで機能させるのは非常に難しいからです。

それで…これについて何ができますか?完全に一般的な解決策を探しに行かないでください。代わりに、実際に何が必要かを考えてください。90% の確率で、「トレースバックを使用して例外をログに記録し、続行する」または「トレースバックを使用して例外を出力し、デフォルトの未処理の例外ハンドラーstderrと同様にする」ことが必要です。exit(1)どちらの場合も、例外を渡す必要はまったくありません。子側でフォーマットして文字列を渡すだけです。もっと手の込んだものが必要な場合は、必要なものを正確に計算し、それを手動でまとめるのに十分な情報を渡してください。トレースバックと例外をフォーマットする方法がわからない場合は、tracebackモジュールを参照してください。とてもシンプルです。これは、ピクルスの機械に入る必要がまったくないことを意味します。(そうじゃないよ』copyregピックラーを作成したり、メソッドなどを使用してホルダー クラスを作成したりし__reduce__ますが、その必要がない場合は、そのすべてを学ぶ必要はありません)。

于 2013-03-21T00:08:46.293 に答える
2

プロセスの例外を表示するための回避策を提案します

from multiprocessing import Process
import traceback


run_old = Process.run

def run_new(*args, **kwargs):
    try:
        run_old(*args, **kwargs)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        traceback.print_exc(file=sys.stdout)

Process.run = run_new
于 2015-09-09T16:51:52.817 に答える
1

これは答えではなく、単なる拡張コメントです。このプログラムを実行して、得られる出力 (ある場合) を教えてください。

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

if __name__ == '__main__':
    inq, outq = Queue(), Queue()
    inq.put(1)
    inq.put('STOP')
    w = Worker(inq, outq)
    w.start()

私は得る:

% test.py
Process Worker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/unutbu/pybin/test.py", line 21, in run
    1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero

あなたが何も得られないなら、私は驚いています。

于 2013-03-20T23:57:30.180 に答える