1

Asyncore では AMQPConnectionError が発生するのに、BlockingConnection では発生しないのはなぜですか?

「Asyncore は Windows では動作しません」というだけの場合は、その使用を禁止するものをまだ見つけていませんが、それはそれで構いません。(この問題はプラットフォームに依存しません。) 移行を容易にするために、Python 2.7 と Python 3.4 の両方で利用できる非同期ライブラリを使用したいと思います。Asyncore はここで動作するはずです。

Python 2.7.6 および pika 0.9.13 で RabbitMQ 3.2.4 を使用しています。ユーザーと管理者の実行レベルに違いはありませんでした。上記の更新された警告メッセージを除いて、コード内のロガーの有無はエラーに関係ありません。Linux (Ubuntu 14.04) と Windows 7 でも同じエラーが発生するため、プラットフォームの問題ではありません。

BlockingConnection を使用すると pika のパフォーマンスがかなり低下するため、代わりに Asyncore アダプターを試してみたいと思いました。テストベッドのセットアップは非常に簡単に思えます(資格情報を与えてみましたが、それは問題ではなく、与えられていない場合はコールバックがスタブ化されます...どちらの方法でも失敗します。):

チュートリアルごとに BlockingConnection を使用すると、機能しますが、スループットが低くなります。

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

AsyncoreConnection を使用すると、私が試したこれのすべてのバリアントがすぐに失敗します。

connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))

エラー:

WARNING:pika.connection:Could not connect, 0 attempts left
Traceback (most recent call last):
  File "C:\workspace\send.py", line 8, in <module>
    connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))
  File "C:\Python27\lib\site-packages\pika\adapters\asyncore_connection.py", line 135, in __init__
    stop_ioloop_on_close)
  File "C:\Python27\lib\site-packages\pika\adapters\base_connection.py", line 62, in __init__
    on_close_callback)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 590, in __init__
    self.connect()
  File "C:\Python27\lib\site-packages\pika\connection.py", line 707, in connect
    self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 61, in wrapper
    return function(*tuple(args), **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 232, in process
    callback(*args, **keywords)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 1192, in _on_connection_error
    raise exceptions.AMQPConnectionError(self.params.connection_attempts)
pika.exceptions.AMQPConnectionError: 1
4

4 に答える 4

8

以下の手順を試してください。centos マシンでも同じ問題に直面していました。

  1. sudo yum install rabbitmq-server
  2. sudo サービスの rabbitmq-server の再起動
于 2019-05-22T10:12:41.453 に答える
3

それは実際にはピカのバグのように見えます。最終的に例外を発生させている connection.connect() コードは次のとおりです。

def connect(self):
    """Invoke if trying to reconnect to a RabbitMQ server. Constructing the
    Connection object should connect on its own.

    """
    self._set_connection_state(self.CONNECTION_INIT)
    if self._adapter_connect():
        return self._on_connected()
    self.remaining_connection_attempts -= 1
    LOGGER.warning('Could not connect, %i attempts left',
                   self.remaining_connection_attempts)
    if self.remaining_connection_attempts:
        LOGGER.info('Retrying in %i seconds', self.params.retry_delay)
        self.add_timeout(self.params.retry_delay, self.connect)
    else:
        self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
        self.remaining_connection_attempts = self.params.connection_attempts
        self._set_connection_state(self.CONNECTION_CLOSED)

self._adapter_connect()は明らかに True を返していません。これは、接続が失敗していることを示しています。AsyncoreConnection._adapter_connectコードは次のとおりです。

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        self._on_connected()

何も返さない!そのため、if ステートメントconnectが True になることは決してありません。他のすべてのアダプターが使用するパターンを反映するようにメソッドを変更すると、次のようになります。

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        return True
    return False

それは正常に動作します。私は間違いなくそのバグを報告します!

編集:

バグは最新バージョンで修正されているようです ( githubから):

    def _adapter_connect(self):
        """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting Pika's suggested buffer size for socket reading and writing. We pass the handle to self so that the AsyncoreDispatcher object can call back into our various state methods.

        """
        error = super(AsyncoreConnection, self)._adapter_connect()
        if not error:
            self.socket = PikaDispatcher(self.socket, None,
                                         self._handle_events)
            self.ioloop = self.socket
            self._on_connected()
        return error
于 2014-05-08T19:13:37.487 に答える
1

この投稿を読む :ロガー "pika.adapters.blocking_connection" のハンドラーが見つかりませんでした

以下を追加して修正:

import logging
logging.basicConfig()

編集

問題が報告されましたhttps://github.com/pika/pika/issues/468

于 2014-05-08T09:15:51.827 に答える