マルチプロセッシングモジュールを使用して、非常に大きなタスクを分割しています。ほとんどの場合は機能しますが、すべてのデータがいつ処理されたかを効果的に判断するのが非常に難しいため、デザインに明らかな何かが欠けている必要があります。
実行する2つの別々のタスクがあります。もう一方を養う一方。これは生産者/消費者問題だと思います。私はすべてのプロセス間で共有キューを使用します。プロデューサーはキューをいっぱいにし、コンシューマーはキューから読み取り、処理を行います。問題は、データの量が限られていることです。そのため、ある時点で、システムが正常にシャットダウンできるように、すべてのデータが処理されたことを全員が知る必要があります。
map_async()関数を使用するのは理にかなっているように見えますが、プロデューサーがキューをいっぱいにしているため、すべての項目を前もって知っているわけではないので、whileループに入ってapply_async()を使用する必要がありますそして、ある種のタイムアウトですべてが完了したことを検出しようとします...醜いです。
明らかな何かが欠けているような気がします。どうすればこれをより適切に設計できますか?
プロデューサー
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
消費者
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
主要
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
ここが安っぽくなるところです。消費するリストが同時にいっぱいになっているため、マップを使用できません。したがって、whileループに入り、タイムアウトを検出する必要があります。プロデューサーがまだいっぱいにしようとしている間にconsumer_queueが空になる可能性があるため、空のキューを検出して終了することはできません。
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
おそらく、メインスレッドのレコードをget()して、キューを渡す代わりにコンシューマーに渡すことができると思いましたが、同じ問題が発生すると思います。それでもwhileループを実行してapply_async()を使用する必要があります。アドバイスを事前にありがとうございます。