別のプロセスを実行して長い時間がかかる作業を実行し、メインモジュールからこれらの束を起動して、すべてが完了するのを待つことができるクラスを作成しようとしています。プロセスを一度起動してから、プロセスを作成して破棄するのではなく、やるべきことを与え続けたいと思っています。たとえば、dd コマンドを実行している 10 台のサーバーがあり、それらすべてでファイルを scp したいなどです。
私の最終的な目標は、IP アドレス、ログ、ランタイムなど、関連付けられているシステムの情報を追跡する各システムのクラスを作成することです。ただし、そのクラスは、システム コマンドを起動してから戻ることができる必要があります。そのシステム コマンドの実行中に実行を呼び出し元に戻して、後でシステム コマンドの結果をフォローアップします。
クラスのインスタンス メソッドをパイプ経由でピクル経由でサブプロセスに送信できないため、試行が失敗しています。それらは酸洗いできません。そのため、さまざまな方法で修正しようとしましたが、わかりません。これを行うためにコードにパッチを適用するにはどうすればよいですか? 有用なものを送信できない場合、マルチプロセッシングは何の役に立つでしょうか?
クラスインスタンスで使用されているマルチプロセッシングの適切なドキュメントはありますか? multiprocessing モジュールを機能させる唯一の方法は、単純な関数を使用することです。クラス インスタンス内でそれを使用する試みはすべて失敗しました。たぶん、代わりにイベントを渡す必要がありますか?その方法はまだわかりません。
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
編集:ProcessWorker
クラスとマルチプロセッシングキューの作成をWorker
クラスの外に移動しようとした後、ワーカーインスタンスを手動でピクルしようとしました。それでもうまくいかず、エラーが発生します
RuntimeError: キュー オブジェクトは、継承によってプロセス間でのみ共有する必要があります
. しかし、これらのキューの参照をワーカー インスタンスに渡すだけですか?? 基本的なものが欠けています。メインセクションから変更されたコードは次のとおりです。
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")