1

あるスレッドでメッセージをリッスンし、メッセージを FIFO キューに追加して、別のスレッドで処理する基本的なユーティリティを作成しました。各メッセージの処理には一定の時間がかかります (点滅しているライトが点滅を停止するのを待っています) が、メッセージはランダムに到着する可能性があります (patternsコードでは、受信メッセージを照合するための正規表現の辞書があり、一致が見つかった場合はそれに追加されます)点滅するカラーパターンとともにキュー)。

blink_queue = Queue()
def receive(data) :
    message = data['text']

    for pattern in patterns:
        if re.match(pattern, message):
            blink_queue.put(patterns[pattern])
            break
    return True

def blinker(q) :
    while True:
        args = q.get().split()
        subprocess.Popen(
            [blink_app] + args,
            startupinfo=startupinfo,
            stderr=subprocess.PIPE,
            stdout=subprocess.PIPE)
        time.sleep(blink_wait)
        q.task_done()

def subscribe():
    print("Listening for messages on '%s' channel..." % channel)
    pubnub.subscribe({
        'channel'  : channel,
        'callback' : receive
    })

blink_worker = Thread(target=blinker, args=(blink_queue,))
blink_worker.daemon=True
blink_worker.start()

sub_thread = Thread(target=subscribe)
sub_thread.daemon=True
sub_thread.start()

sub_thread.join()

Python で FIFO キューを実装して、最も古い (最初の) キューが大きくなった場合に自動的に削除する方法を教えてください。別の監視スレッドを作成するか、subscribeスレッドのサイズをチェックしますか? 私は Python を初めて使用するので、完全に論理的なデータ型がある場合は、お気軽に初心者と呼んで正しい方向に送ってください。

4

2 に答える 2

7

論理型があることがわかりましたcollections.deque。ドキュメントから:

maxlen が指定されていないか、None の場合、deque は任意の長さに成長する可能性があります。それ以外の場合、deque は指定された最大長に制限されます。境界付きの長さの両端キューがいっぱいになると、新しいアイテムが追加されると、対応する数のアイテムが反対側から破棄されます。

(そして、これがこのデータ型を実装するコミットです)

于 2012-12-14T22:20:15.437 に答える
0

このために、 Queueをサブクラス化し、メソッドをオーバーロードして、が大きくなりすぎputた場合に必要な方法でアイテムを削除します。Queue

例えば

class NukeOldDataQueue(Queue.Queue):
    def put(self,*args,**kwargs):
        if self.full():
            try:
                oldest_data = self.get()
                print('[WARNING]: throwing away old data:'+repr(oldest_data))
            # a True value from `full()` does not guarantee
            # that anything remains in the queue when `get()` is called
            except Queue.Empty:
                pass
        Queue.Queue.put(self,*args,**kwargs)

また、誤って新しいデータを破棄することがどれほど悪いか、または呼び出しのブロックが許容できるかどうかに応じて、パラメーターを渡すblock=Falseか、パラメーターを操作することもできます。timeoutput()

于 2012-12-14T20:50:27.880 に答える