2

Python プログラムを使用して、フロートのリストの平均値を計算しています。プログラムロジックに従う:

  1. プログラムはいくつかの引数で開始されます。
  2. リスト「hostgroups」が作成されます。
  3. 関数ワーカー (hosgroup、var1、var2、var3、...) を開始するリスト「hostgroups」に対する for-in ループ
  4. ワーカー関数内では、2 つの変数がワーカーのいくつかの入力変数で構築されます。
    • 4a. ワーカー内で、ワーカーのいくつかの入力変数を使用してサブワーカー関数が呼び出されます
    • 4b. サブワーカーはいくつかの新しい変数を返します
    • 4c。ワーカーに戻る
    • 4d。いくつかのことが行われます
    • 4d。最後に、ワーカーでいくつかの変数を指定して final-function が呼び出されます。

これまでのところ、大丈夫です!

私の次のステップは、マルチプロセッシングをセットアップすることです...誰が助けてくれますか?

更新:これが私の実際のアプローチです:

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, hostgroup, lock):
        self.hostgroup = hostgroup
        self.lock = lock
    def __call__(self):
        print 'Doing something fancy for %s!' % self.hostgroup
        try:
            lock.acquire()
            worker(self.hostgroup,hostsfile,mod_inputfile,outputdir,testmode,backup_dir,start_time,end_time,rrdname,unit,yesterday,now_epoch,rrd_interval,rrd_heartbeat,name)
        finally:
            lock.release()
    def __str__(self):
        return 'str %s' % self.hostgroup

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    for hostgroup in hostgroups:
        tasks.put(Task(hostgroup,lock))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

---> ここまでは大丈夫です。しかし、ロックはできません。すべての結果は同じです.... なぜ lock.acquire() が機能しないのですか?

4

1 に答える 1

3

multiprocessing.PoolQueue クラスよりもはるかに使いやすいことがわかりました。基本的なセットアップは

from multiprocessing import Pool
p = Pool(processes=<number of processes>)
p.map(function, [a, b, c])

function(a)function(b)function(c)を独立したプロセスで呼び出す

于 2012-08-07T13:45:05.520 に答える