5

I / Oヘビーブロッキング呼び出しに依存するプロデューサー関数と、I/Oヘビーブロッキング呼び出しに依存するコンシューマー関数があります。それらを高速化するために、Geventマイクロスレッディングライブラリを接着剤として使用しました。

私のパラダイムは次のようになります。

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

私には4人の消費者がいて、2人の生産者が欲しいです。プロデューサーは、シグナル、つまり10のときに終了します。コンシューマーはこのキューをフィードし続け、プロデューサーとコンシューマーが終了するとタスク全体が終了します。

ただし、これは機能しません。for複数のプロデューサーを生成し、単一のプロデューサーのみを使用するループをコメントアウトすると、スクリプトは正常に実行されます。

私は自分が間違ったことを理解できないようです。

何か案は?

ありがとう

4

4 に答える 4

6

概念的には、アプリケーションが終了する必要があるときではないため、キューに未完了の作業がないときに実際に終了する必要はありません。

プロデューサーが終了したとき、そして未完成の作業がなくなったときに終了したいとします

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
于 2012-02-09T14:25:12.437 に答える
3

q.join()何かがキューに入れられてすぐに終了する前に、それが行われると思います。キューに参加する前に、すべてのプロデューサーに参加してみてください。

于 2012-02-09T12:30:16.573 に答える
0

あなたがしたいのは、プロデューサーとワーカーが通信している間、メインプログラムをブロックすることです。キューでのブロックは、キューが空になるまで待機してから、すぐに終了する可能性があります。これをプログラムの最後に置くのではなくq.join()

gevent.joinall(producers)
于 2012-02-16T12:03:14.867 に答える
0

私はあなたと同じ問題に遭遇しました。コードの主な問題は、プロデューサーがgeventスレッドで生成されたため、ワーカーがすぐにタスクを取得できなかったことです。

producer()プロセスの実行が、タスクをすぐにプッシュできるプロデューサーに出会ったときに、geventスレッドで生成されないメインプロセスで実行することをお勧めします。

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


producer()

for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

上記のコードは理にかなっています..:)

于 2015-12-08T13:55:54.807 に答える