6

メッセージの受信には basic_consume() を使用し、消費のキャンセルには basic_cancel を使用していますが、問題があります。

これが pika.channel のコードです

 def basic_consume(self, consumer_callback, queue='', no_ack=False,
                      exclusive=False, consumer_tag=None):
        """Sends the AMQP command Basic.Consume to the broker and binds messages
        for the consumer_tag to the consumer callback. If you do not pass in
        a consumer_tag, one will be automatically generated for you. Returns
        the consumer tag.

        For more information on basic_consume, see:
        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume

        :param method consumer_callback: The method to callback when consuming
        :param queue: The queue to consume from
        :type queue: str or unicode
        :param bool no_ack: Tell the broker to not expect a response
        :param bool exclusive: Don't allow other consumers on the queue
        :param consumer_tag: Specify your own consumer tag
        :type consumer_tag: str or unicode
        :rtype: str

        """
        self._validate_channel_and_callback(consumer_callback)

        # If a consumer tag was not passed, create one
        consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number,
                                                      uuid.uuid4().get_hex())

        if consumer_tag in self._consumers or consumer_tag in self._cancelled:
            raise exceptions.DuplicateConsumerTag(consumer_tag)

        self._consumers[consumer_tag] = consumer_callback
        self._pending[consumer_tag] = list()
        self._rpc(spec.Basic.Consume(queue=queue,
                                     consumer_tag=consumer_tag,
                                     no_ack=no_ack,
                                     exclusive=exclusive),
                           self._on_eventok,
                           [(spec.Basic.ConsumeOk,
                             {'consumer_tag': consumer_tag})])

        return consumer_tag

def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
        """This method cancels a consumer. This does not affect already
        delivered messages, but it does mean the server will not send any more
        messages for that consumer. The client may receive an arbitrary number
        of messages in between sending the cancel method and receiving the
        cancel-ok reply. It may also be sent from the server to the client in
        the event of the consumer being unexpectedly cancelled (i.e. cancelled
        for any reason other than the server receiving the corresponding
        basic.cancel from the client). This allows clients to be notified of
        the loss of consumers due to events such as queue deletion.

        :param method callback: Method to call for a Basic.CancelOk response
        :param str consumer_tag: Identifier for the consumer
        :param bool nowait: Do not expect a Basic.CancelOk response
        :raises: ValueError

        """
        self._validate_channel_and_callback(callback)
        if consumer_tag not in self.consumer_tags:
            return
        if callback:
            if nowait is True:
                raise ValueError('Can not pass a callback if nowait is True')
            self.callbacks.add(self.channel_number,
                               spec.Basic.CancelOk,
                               callback)
        self._cancelled.append(consumer_tag)
        self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag,
                                    nowait=nowait),
                  self._on_cancelok,
                  [(spec.Basic.CancelOk,
                    {'consumer_tag': consumer_tag})] if nowait is False else [])

消費をキャンセルするたびにわかるように、consumer_tag が _canceled リストに追加されます。このタグを basic_consume で再度使用すると、 duplicateConsumer 例外が発生します。そうですね、毎回新しい consumer_tag を使用できますが、実際にはそうではありません。遅かれ早かれ、生成されたタグは以前のもののいくつかと完全に一致するためです。

pika で優雅に消費を一時停止および再開するにはどうすればよいですか?

4

2 に答える 2

1

独自の を定義する理由はありますconsumer_tagsか? 空の文字列を渡して、RabbitMQ にコンシューマー タグを生成させることができます。からの返信はbasic.consumebasic.consume-ok生成された を返すconsumer_tagため、後でそれを使用して消費を停止できます。

参照: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume-ok

于 2013-12-19T12:17:33.563 に答える