I / Oヘビーブロッキング呼び出しに依存するプロデューサー関数と、I/Oヘビーブロッキング呼び出しに依存するコンシューマー関数があります。それらを高速化するために、Geventマイクロスレッディングライブラリを接着剤として使用しました。
私のパラダイムは次のようになります。
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
#This doesnt work.
for j in range(2):
producers.append(gevent.spawn(producer))
#Uncommenting this makes this script work.
#producer()
q.join()
私には4人の消費者がいて、2人の生産者が欲しいです。プロデューサーは、シグナル、つまり10のときに終了します。コンシューマーはこのキューをフィードし続け、プロデューサーとコンシューマーが終了するとタスク全体が終了します。
ただし、これは機能しません。for
複数のプロデューサーを生成し、単一のプロデューサーのみを使用するループをコメントアウトすると、スクリプトは正常に実行されます。
私は自分が間違ったことを理解できないようです。
何か案は?
ありがとう