2

私はそのようなことをしたいと思います(1つのキューと複数のコンシューマー):

import gevent
from gevent import queue

q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(consumer,i) for i in [q,q]]

gevent.joinall(jobs)

しかし、それは不可能です...キューはjob1によって消費されます...したがって、job2は永久にブロックされます。それは私に例外を与えますgevent.hub.LoopExit: This operation would block forever

各消費者が最初から完全なキューを消費できるようになることを望みます。(1,2,3,1,2,3または1,1,2,2,3,3を表示する必要があります...気にしないでください)

スポーンする前にキューのクローンを作成するのが 1 つのアイデアですが、コピー (浅い/深い) モジュールを使用することはできません ;-(

それを行う別の方法はありますか?

[編集] それについてどう思いますか?

import gevent
from gevent import queue

class MasterQueueClonable(queue.Queue):
    def __init__(self,*a,**k):
        queue.Queue.__init__(self,*a,**k)

        self.__cloned = []
        self.__old=[]

    #override
    def get(self,*a,**k):
        e=queue.Queue.get(self,*a,**k)
        for i in self.__cloned:  i.put(e) # serve to current clones
        self.__old.append(e)              # save old element
        return e

    def clone(self):
        q=queue.Queue()
        for i in self.__old: q.put(i)   # feed a queue with elements which are out
        self.__cloned.append(q)         # stock the queue, to be able to put newer elements too
        return q

q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print id(qq),i

jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)

RyanYe のアイデアに基づいています。ディスパッチャのない「マスター キュー」があります。私のマスター キューは GET メソッドをオーバーライドし、オンデマンド クローンにディスパッチできます。さらに、マスターキューの開始後に「クローン」を作成できます (__old トリックを使用)。

4

3 に答える 3

2

作品を消費者に発送するためのグリーンレットを作成することをお勧めします。コード例:

import gevent
from gevent import queue

master_queue=queue.Queue()
master_queue.put(1)
master_queue.put(2)
master_queue.put(3)
master_queue.put(StopIteration)

total_consumers = 10
consumer_queues = [queue.Queue() for i in xrange(total_consumers)]

def dispatcher(master_queue, consumer_queues):
    for i in master_queue:
        [j.put(i) for j in consumer_queues]
    [j.put(StopIteration) for j in consumer_queues]

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues]
gevent.joinall(jobs)

更新: コンシューマー キューの StopIteration の欠落を修正します。arilouさん、ご指摘ありがとうございます。

于 2012-07-25T07:42:57.707 に答える
1

Queueクラスにメソッドを追加しました: copy()

>>> import gevent.queue
>>> q = gevent.queue.Queue()
>>> q.put(5)
>>> q.copy().get()
5
>>> q
<Queue at 0x1062760d0 queue=deque([5])>

それが役立つかどうか教えてください。

于 2012-07-26T10:54:05.903 に答える
0

Ryan Ye による回答では、dispatcher() 関数の最後に 1 行がありません: [j.put(StopIteration) for j in consumer_queues] それがないと、「gevent.hub.LoopExit: この操作は永久にブロックされます」という結果になります。 'for i in master_queue' ループは StopIteration 例外を consumer_queues にコピーしないためです。

(申し訳ありませんが、まだコメントを残すことができないため、別の回答として書きます。)

于 2012-07-25T21:39:10.747 に答える