1

Python のマルチスレッド キューに問題があります。プロデューサーが入力キューから要素を取得し、いくつかの要素を生成して出力キューに配置し、コンシューマーが出力キューから要素を取得してそれらを出力するこのスクリプトがあります。

import threading
import Queue

class Producer(threading.Thread):
    def __init__(self, iq, oq):
        threading.Thread.__init__(self)
        self.iq = iq
        self.oq = oq

    def produce(self, e):
        self.oq.put(e*2)
        self.oq.task_done()
        print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2)

    def run(self):
        while 1:
            e = self.iq.get()
            self.iq.task_done()
            print "Get %d from input Queue"%(e)
            self.produce(e)


class Consumer(threading.Thread):
    def __init__(self, oq):
        threading.Thread.__init__(self)
        self.oq = oq

    def run(self):
        while 1:
            e = self.oq.get()
            self.oq.task_done()
            print "Consumer get %d from output queue and consumed"%e

iq = Queue.Queue()
oq = Queue.Queue()

for i in xrange(2):
    iq.put((i+1)*10)

for i in xrange(2):
    t1 = Producer(iq, oq)
    t1.setDaemon(True)
    t1.start()

    t2 = Consumer(oq)
    t2.setDaemon(True)
    t2.start()

iq.join()
oq.join()

しかし、実行するたびに動作が異なります(例外が発生するか、消費者は何もしません)。問題は task_done() コマンドにあると思いますが、バグの場所を誰か説明してもらえますか?

Consumer クラスを変更しました。

class Consumer(threading.Thread):
    def __init__(self, oq):
        threading.Thread.__init__(self)
        self.oq = oq

    def run(self):
        while 1:
            e = self.oq.get()
            self.oq.task_done()
            print "Consumer get %d from output queue and consumed"%e
            page = urllib2.urlopen("http://www.ifconfig.me/ip")
            print page

各 task_done() コマンドの後のコンシューマは Web サイトに接続する必要があります (時間がかかります) が、そうではなく、代わりに task_done() の後のコードの実行時間が短い場合は実行されますが、長い場合は実行されません! なんで?誰でもこの問題を説明できますか? task_done() コマンドの前にすべてを配置すると、他のスレッドからキューをブロックしますが、これは十分に愚かです。または、Python でのマルチスレッド化について欠けているものはありますか?

4

1 に答える 1

3

Queue ドキュメントから:

Queue.task_done() 以前にキューに入れられたタスクが完了したことを示します。キュー コンシューマ スレッドによって使用されます。タスクのフェッチに使用される get() ごとに、その後の task_done() の呼び出しにより、タスクの処理が完了したことがキューに通知されます。

join() が現在ブロックされている場合、すべてのアイテムが処理されたときに再開されます (つまり、キューに put() されたすべてのアイテムに対して task_done() 呼び出しが受信されたことを意味します)。

たとえば、コードでは、Producerクラスで次のことを行います。

def produce(self, e):
    self.oq.put(e*2)
    self.oq.task_done()
    print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2)

self.oq.task_done()を使用していないため、ここでは行うべきではありませんoq.get()

ただし、これが唯一の問題かどうかはわかりません。

編集:

あなたが使用している他の問題についてiq.join()oq.join()、最後に、これにより、他のスレッドが取得したページを印刷する前にメインスレッドが終了し、スレッドを として作成しているためDaemons、Python アプリケーションは終了するのを待たずに終了します実行中。( にQueue.join()依存することを覚えておいてくださいQueue.task_done())

「task_done() コマンドの前にすべてを配置すると、他のスレッドからのキューをブロックします」と言っています。意味がわかりません。これはスレッドをブロックするだけですが、互いにブロックされないスレッドをConsumerいつでも作成できます。Consumer

于 2012-06-24T16:47:19.913 に答える