27

multiprocessing.Queuea をリストにダンプしたい。そのタスクのために、次の関数を作成しました。

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

しかし、何らかの理由で機能しません。

次のシェル セッションを確認します。

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 

ここで何が起こっているのですか?すべてのアイテムがダンプされていないのはなぜですか?

4

2 に答える 2

27

これを試して:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

マルチプロセッシングキューには、バッファから作業を引き出してパイプにフラッシュするフィーダスレッドを備えた内部バッファがあります。すべてのオブジェクトがフラッシュされていない場合、Emptyが時期尚早に発生する場合があります。センチネルを使用してキューの終わりを示すことは安全です(そして信頼できます)。また、iter(get、sentinel)イディオムを使用する方が、Emptyに依存するよりも優れています。

フラッシュのタイミングが原因で空になる可能性があるのは好きではありません(time.sleep(.1)を追加して、フィーダースレッドへのコンテキストスイッチを許可しました。必要ない場合があります。それがなくても機能します。これは習慣です。 GILを解放します)。

于 2009-10-08T23:36:18.317 に答える