12

queue.Queue複数の を同時に「選択」するにはどうすればよいですか?

Golang には、そのチャネルに必要な機能があります。

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

ブロックを解除する最初のチャネルは、対応するブロックを実行します。Pythonでこれをどのように達成しますか?

Update0

tux21b 's answerに記載されているリンクによると、目的のキュー タイプには次のプロパティがあります。

  • マルチ プロデューサー/マルチ コンシューマー キュー (MPMC)
  • プロデューサーごとの FIFO/LIFO を提供
  • キューが空/満杯の場合、コンシューマー/プロデューサーはブロックされます

さらに、チャネルがブロックされる可能性があり、プロデューサーはコンシューマーがアイテムを取得するまでブロックします。Python の Queue でこれができるかどうかはわかりません。

4

4 に答える 4

3

を使用する場合queue.PriorityQueue、優先順位としてチャネルオブジェクトを使用して同様の動作を得ることができます。

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

出力例:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

この例では、はメソッドを実装してGoのステートメントに似たものにするためのChannelManager単なるラッパーです。queue.PriorityQueueselectcontextmanagerselect

注意すべきいくつかのこと:

  • 注文

    • Goの例では、ステートメント内にチャネルが書き込まれる順序によって、select複数のチャネルで使用可能なデータがある場合に実行されるチャネルのコードが決まります。

    • Pythonの例では、順序は各チャネルに割り当てられた優先度によって決定されます。ただし、優先度は各チャネルに動的に割り当てることができるため(例を参照)、selectメソッドへの引数に基づいて新しい優先度の割り当てを処理する、より複雑なメソッドを使用して順序を変更できます。また、コンテキストマネージャが終了すると、古い順序を再確立できます。

  • ブロッキング

    • Goの例では、ケースが存在するselect場合、ステートメントはブロックされていdefaultます。

    • selectPythonの例では、ブロック/非ブロックが必要な場合に明確にするために、ブール引数をメソッドに渡す必要があります。非ブロッキングの場合、コンテキストマネージャーによって返されるチャネルは単なる文字列であるため、ステートメント'default'内のコードでこれを簡単に検出できます。with

  • スレッド化:モジュール内のオブジェクトはqueue、例ですでに見たように、マルチプロデューサー、マルチコンシューマーシナリオの準備ができています。

于 2011-12-10T13:10:20.017 に答える
2

pychanプロジェクトは、多重化を含め、Python で Go チャネルを複製します。Go と同じアルゴリズムを実装しているため、必要なすべてのプロパティを満たします。

  • 複数のプロデューサーとコンシューマーが Chan を介して通信できます。プロデューサーとコンシューマーの両方の準備が整うと、それらのペアがブロックされます
  • プロデューサーとコンシューマーは、到着した順序でサービスを受けます (FIFO)
  • 空の (満杯の) キューは、コンシューマー (プロデューサー) をブロックします。

例は次のようになります。

c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(完全な開示:私はこのライブラリを書きました)

于 2013-06-08T20:50:49.820 に答える
2

queue.Queue availableのように、プロデューサー/コンシューマー キューにはさまざまな実装があります。これらは通常、Dmitry Vyukov によるこの優れた記事に記載されているように、多くのプロパティが異なります。ご覧のとおり、10,000 を超えるさまざまな組み合わせが可能です。このようなキューに使用されるアルゴリズムも、要件によって大きく異なります。追加のプロパティを保証するために既存のキュー アルゴリズムを拡張することはできません。これには通常、異なる内部データ構造と異なるアルゴリズムが必要になるためです。

Go のチャネルは比較的多数の保証されたプロパティを提供するため、これらのチャネルは多くのプログラムに適している可能性があります。最も困難な要件の 1 つは、一度に複数のチャネルでの読み取り/ブロック (select ステートメント) のサポートと、select ステートメント内の複数の分岐が続行できる場合にチャネルを公平に選択することです。これにより、メッセージが取り残されることはありません。 . Python のqueue.Queueはこの機能を提供しないため、同じ動作をアーカイブすることはまったく不可能です。

そのため、引き続きqueue.Queueを使用したい場合は、その問題の回避策を見つける必要があります。ただし、回避策には独自の欠点のリストがあり、保守が困難です。必要な機能を提供する別のプロデューサー/コンシューマー キューを探すことをお勧めします。とにかく、ここに2つの可能な回避策があります:

ポーリング

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

これは、チャネルのポーリング中に多くの CPU サイクルを使用する可能性があり、多くのメッセージがある場合は遅くなる可能性があります。time.sleep() を指数関数的なバックオフ時間 (ここに示されている一定の 0.1 秒ではなく) で使用すると、このバージョンが大幅に改善される可能性があります。

単一の通知キュー

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

このセットアップでは、c1 または c2 に送信した後に通知キューに何かを送信する必要があります。そのような通知キューが 1 つだけで十分である限り、これは機能する可能性があります (つまり、チャネルの異なるサブセットでそれぞれがブロックする複数の「選択」がない場合)。

または、Go の使用を検討することもできます。とにかく、Go の goroutines と並行性のサポートは、Python の制限されたスレッド機能よりもはるかに強力です。

于 2011-12-16T20:46:14.140 に答える
1
from queue import Queue

# these imports needed for example code
from threading import Thread
from time import sleep
from random import randint

class MultiQueue(Queue):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queues = []

    def addQueue(self, queue):
        queue.put = self._put_notify(queue, queue.put)
        queue.put_nowait = self._put_notify(queue, queue.put_nowait)
        self.queues.append(queue)

    def _put_notify(self, queue, old_put):
        def wrapper(*args, **kwargs):
            result = old_put(*args, **kwargs)
            self.put(queue)
            return result
        return wrapper

if __name__ == '__main__':
    # an example of MultiQueue usage

    q1 = Queue()
    q1.name = 'q1'
    q2 = Queue()
    q2.name = 'q2'
    q3 = Queue()
    q3.name = 'q3'

    mq = MultiQueue()
    mq.addQueue(q1)
    mq.addQueue(q2)
    mq.addQueue(q3)

    queues = [q1, q2, q3]
    for i in range(9):
        def message(i=i):
            print("thread-%d starting..." % i)
            sleep(randint(1, 9))
            q = queues[i%3]
            q.put('thread-%d ending...' % i)
        Thread(target=message).start()

    print('awaiting results...')
    for _ in range(9):
        result = mq.get()
        print(result.name)
        print(result.get())

.get()複数のキューの方法を使用しようとするのではなく、ここでのアイデアは、MultiQueueデータの準備ができたときにキューに通知selectさせることです。これは、さまざまな と メソッドをラップしMultiQueueて、それらのキューに何かが追加されると、そのキューが に追加され、対応するがデータの準備ができているを取得することによって実現されます。Queueput()put_nowait()put()MultiQueueMultiQueue.get()Queue

これMultiQueueは FIFO キューに基づいていますが、必要に応じて LIFO または優先キューをベースとして使用することもできます。

于 2011-12-16T22:05:46.147 に答える