27

別のプロセスを実行して長い時間がかかる作業を実行し、メインモジュールからこれらの束を起動して、すべてが完了するのを待つことができるクラスを作成しようとしています。プロセスを一度起動してから、プロセスを作成して破棄するのではなく、やるべきことを与え続けたいと思っています。たとえば、dd コマンドを実行している 10 台のサーバーがあり、それらすべてでファイルを scp したいなどです。

私の最終的な目標は、IP アドレス、ログ、ランタイムなど、関連付けられているシステムの情報を追跡する各システムのクラスを作成することです。ただし、そのクラスは、システム コマンドを起動してから戻ることができる必要があります。そのシステム コマンドの実行中に実行を呼び出し元に戻して、後でシステム コマンドの結果をフォローアップします。

クラスのインスタンス メソッドをパイプ経由でピクル経由でサブプロセスに送信できないため、試行が失敗しています。それらは酸洗いできません。そのため、さまざまな方法で修正しようとしましたが、わかりません。これを行うためにコードにパッチを適用するにはどうすればよいですか? 有用なものを送信できない場合、マルチプロセッシングは何の役に立つでしょうか?

クラスインスタンスで使用されているマルチプロセッシングの適切なドキュメントはありますか? multiprocessing モジュールを機能させる唯一の方法は、単純な関数を使用することです。クラス インスタンス内でそれを使用する試みはすべて失敗しました。たぶん、代わりにイベントを渡す必要がありますか?その方法はまだわかりません。

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

編集:ProcessWorkerクラスとマルチプロセッシングキューの作成をWorkerクラスの外に移動しようとした後、ワーカーインスタンスを手動でピクルしようとしました。それでもうまくいかず、エラーが発生します

RuntimeError: キュー オブジェクトは、継承によってプロセス間でのみ共有する必要があります

. しかし、これらのキューの参照をワーカー インスタンスに渡すだけですか?? 基本的なものが欠けています。メインセクションから変更されたコードは次のとおりです。

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")
4

3 に答える 3

27

つまり、問題は、Python が C++/fork() の動作とは何らかの形で異なる何らかの魔法を行っていると想定していたことです。どういうわけか、Python はプログラム全体ではなく、クラスだけを別のプロセスにコピーしたと思いました。pickle のシリアライゼーションに関するすべての話を聞いて、実際にはすべてがパイプ経由で送信されていると思ったので、これを機能させるために何日も無駄にしました。特定のものをパイプ経由で送信できないことはわかっていましたが、問題は適切にパッケージ化していないことにあると思いました。

Python のドキュメントで、このモジュールが使用されたときに何が起こるかについて 10,000 フィートのビューが提供されていれば、これはすべて回避できたはずです。確かに、マルチプロセス モジュールのメソッドが何をするかを教えてくれ、いくつかの基本的な例を示してくれますが、私が知りたいのは、舞台裏の「操作の理論」です! これが私が使用できた種類の情報です。私の答えがずれている場合は、チャイムを鳴らしてください。それは私が学ぶのに役立ちます。

このモジュールを使用してプロセスの開始を実行すると、プログラム全体が別のプロセスにコピーされます。しかし、それは "__main__" プロセスと私のコードはそれをチェックしていましたが、さらに別のプロセスを無限に起動することはありません。ただ停止し、ゾンビのように何かをするのを待っているだけです。呼び出し時に親で初期化されたすべてのものmultiprocess.Process() はすべてセットアップされ、準備ができています. multiprocess.Queue または共有メモリ、またはパイプなどに何かを入れると (どのように通信しているかに関係なく)、別のプロセスがそれを受け取り、作業を開始します. It can draw on all import modules and setup just as it was the parent. ただし、一部の内部状態変数が親プロセスまたは別のプロセスで変更されると、それらの変更は分離されます. プロセスが生成されると、保持するのはあなたの仕事になります.必要に応じて、キュー、パイプ、共有メモリなどを介してそれらを同期します。

私はコードを捨てて最初からやり直しましたが、今はProcessWorker、コマンドラインを実行する「実行」メソッドである . ものすごく単純。この方法で一連のプロセスを起動してから閉じることを心配する必要はありません。これにより、過去に C++ であらゆる種類の不安定性とパフォーマンスの問題が発生しました。最初にプロセスを起動し、待機中のプロセスにメッセージを渡すように切り替えたところ、パフォーマンスが向上し、非常に安定しました。

ところで、私は助けを得るためにこのリンクを見ましたが、例がメソッドがキューを越えて転送されていると思わせたので、私を失望させました: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 2番目の例最初のセクションの "next_task()" は、(私には) キュー経由で受け取ったタスクを実行しているように見えました。

于 2013-01-06T06:03:54.613 に答える
8

メソッド自体を送信しようとする (これは非現実的です) 代わりに、実行するメソッドの名前を送信してみてください。

各ワーカーが同じコードを実行するとすれば、単純なgetattr(self, task_name).

tuples を渡します(task_name, task_args)。ここでtask_args、タスク メソッドに直接供給される dict がありました。

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break
于 2013-01-05T07:44:33.817 に答える
0

参照: https://stackoverflow.com/a/14179779

David Lynch による 1 月 6 日の 6:03 の回答は、 http://www.doughellmann.com/PyMOTW/multiprocessing/communication.htmlに惑わされたと彼が言うとき、事実として正しくありません 。

提供されているコードと例は正しく、宣伝どおりに機能します。next_task() キューを介して受け取ったタスクをTask.__call__()実行しています --メソッドが何をしているのかを理解してみてください。

私の場合、私をつまずかせたのは、の実装での構文エラーでしrun()た。サブプロセスはこれを報告せず、黙って失敗するようです - 物事が奇妙なループに陥ったままになります! Emacs の Flymake/Pyflakes など、ある種の構文チェッカーが実行されていることを確認してください。

F を介したデバッグmultiprocessing.log_to_stderr()は、問題を絞り込むのに役立ちました。

于 2013-11-28T04:06:44.393 に答える