0

multiprocessingPythonで使用する3つの別々のプロセスを定義する簡単なサンプルスクリプトを作成しました。私の目的は、データを収集して処理する2つの小さなスレッドを生成する1つの親スレッドを持つことです。

現在、私の実装は次のようになっています。

from Queue import Queue,Empty
from multiprocessing import Process
import time
import hashlib


class FillQueue(Process):
    def __init__(self,q):
        Process.__init__(self)
        self.q = q

    def run(self):
        i = 0
        while i is not 5:
            print 'putting'
            self.q.put('foo')
            i+=1
        self.q.put('|STOP|')

class ConsumeQueue(Process):
    def __init__(self,q):
        Process.__init__(self)
        self.q = q

    def run(self):
        print 'Consume'
        while True:
            try:
                value =  self.q.get(False)
                print value
                if value == '|STOP|':
                    print 'done'
                    break;
            except Empty:
                print 'Nothing to process atm'

class Ripper(Process):

    q = Queue()

    def __init__(self):
        self.fq = FillQueue(self.q)
        self.cq = ConsumeQueue(self.q)
        self.fq.daemon = True
        self.cq.daemon = True

    def run(self):
        try:
            self.fq.start()
            self.cq.start()
        except KeyboardInterrupt:
            print 'exit'

if __name__ == '__main__':
    r = Ripper()
    r.start()

現在実行されているため、CLIのスクリプトからの出力は次のようになります。

putting
putting
putting
putting
putting
Consume
foo
foo
foo
foo
foo
|STOP|
done

明らかに、フィラーがアイテムの追加を完了するまで、コンシューマーはキュー内のアイテムの処理を開始しないため、2つのスレッドを開始する方法がブロックされています。

これを書き直して、両方のスレッドをブロックせずにすぐに開始するようにするにEmptyは、処理する作業がないときにコンシューマーが単にexceptブロックに渡され、停止メッセージを受信すると完全に終了するようにするにはどうすればよいですか?

編集:タイプミス、startrunメソッドが混同されていた

4

3 に答える 3

4

multiprocessing.Process を使用して複数のプロセスを開始しているようです。

ただし、スレッドセーフのみであり、複数のプロセスで使用するように設計されていない Queue.Queue を使用しています。

shevek の回答も有効ですが、最初に Queue.Queue を multiprocessing.Queue に置き換える必要があります。

于 2012-10-02T19:57:24.340 に答える
3

これを試して:

from Queue import Empty
from multiprocessing import Process, Queue
import time
import hashlib


class FillQueue(object):
    def __init__(self, q): 
        self.q = q 

    def run(self):
        i = 0 
        while i < 5:
            print 'putting'
            self.q.put('foo %d' % i ) 
            i+=1
            time.sleep(.5)
        self.q.put('|STOP|')

class ConsumeQueue(object):
    def __init__(self, q): 
        self.q = q 

    def run(self):
        while True:
            try:
                value =  self.q.get(False)
                print value
                if value == '|STOP|':
                    print 'done'
                    break;
            except Empty:
                print 'Nothing to process atm'
                time.sleep(.2)


if __name__ == '__main__':
    q = Queue()
    f = FillQueue(q)
    c = ConsumeQueue(q)

    p1 = Process(target=f.run)
    p1.start()

    p2 = Process(target=c.run)
    p2.start()

    p1.join()
    p2.join()
于 2012-10-02T20:01:51.203 に答える
3

あなたのプログラムはうまくいくと思います。CPU は一度に 1 つのことだけを短時間処理します。ただし、すべてのものをキューに入れるのに必要な時間は非常に短いです。したがって、フィラーが 1 つのタイム スライスでこれを実行できない理由はありません。

フィラーに遅延を追加すると、実際に期待どおりに機能することがわかるはずです。

于 2012-10-02T19:40:47.920 に答える