7

マルチプロセッシングモジュールを使用して、非常に大きなタスクを分割しています。ほとんどの場合は機能しますが、すべてのデータがいつ処理されたかを効果的に判断するのが非常に難しいため、デザインに明らかな何かが欠けている必要があります。

実行する2つの別々のタスクがあります。もう一方を養う一方。これは生産者/消費者問題だと思います。私はすべてのプロセス間で共有キューを使用します。プロデューサーはキューをいっぱいにし、コンシューマーはキューから読み取り、処理を行います。問題は、データの量が限られていることです。そのため、ある時点で、システムが正常にシャットダウンできるように、すべてのデータが処理されたことを全員が知る必要があります。

map_async()関数を使用するのは理にかなっているように見えますが、プロデューサーがキューをいっぱいにしているため、すべての項目を前もって知っているわけではないので、whileループに入ってapply_async()を使用する必要がありますそして、ある種のタイムアウトですべてが完了したことを検出しようとします...醜いです。

明らかな何かが欠けているような気がします。どうすればこれをより適切に設計できますか?

プロデューサー

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

消費者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

主要

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

ここが安っぽくなるところです。消費するリストが同時にいっぱいになっているため、マップを使用できません。したがって、whileループに入り、タイムアウトを検出する必要があります。プロデューサーがまだいっぱいにしようとしている間にconsumer_queueが空になる可能性があるため、空のキューを検出して終了することはできません。

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

おそらく、メインスレッドのレコードをget()して、キューを渡す代わりにコンシューマーに渡すことができると思いましたが、同じ問題が発生すると思います。それでもwhileループを実行してapply_async()を使用する必要があります。アドバイスを事前にありがとうございます。

4

2 に答える 2

2

を使用しmanager.Eventて、作業の終了を通知できます。このイベントはすべてのプロセス間で共有でき、メインプロセスからシグナルを送信すると、他のワーカーが正常にシャットダウンできます。

while not event.is_set():
 ...rest of code...

したがって、コンシューマーはイベントが設定されるのを待ち、設定されたらクリーンアップを処理します。

このフラグをいつ設定するかを決定するにjoinは、プロデューサースレッドでを実行し、それらがすべて完了したら、コンシューマースレッドに参加できます。

于 2012-12-20T20:49:49.560 に答える
-1

離散イベントシミュレーションを実行するには、マルチプロセス/スレッドではなくSimPyを強くお勧めします。

于 2012-12-20T21:17:31.250 に答える