問題タブ [pika]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
1774 参照

python - 私のRabbitMQパスワードが失敗したことをPythonはどのように検出できますか?

Pika ライブラリを使用して、RabbitMQ を使用する Python アプリケーションを作成しようとしています。最新バージョンの 0.9.5 を使用しています。私の問題は、エラーを通知する Pika コールバックを登録する方法がわからないため、Python コードが RabbitMQ のユーザー名とパスワードが間違っていることを検出できないことです。私のコードは次のように接続しています:

このスクリプトを実行するioloopと、3 秒を少し超える時間が経過した後、プログラムが終了します。コールバックを登録する方法や、デッド接続の状態を単純に調べて、エラーが発生したかどうかを判断したり、それが不正なパスワード エラーであることを特定したりする方法がわかりません。Pika のドキュメントで重要なことを見逃していませんか?

Pika 0.9.5 のエラー処理がどのように機能するかを理解してくれる人はいますか? それとも、0.9.5は、この新しい開発ラインが行き止まりになっていることを示しているように見えるこの電子メール メッセージに気付いたので、代わりに以前のバージョンの Pika を使用することで避けるべき、ある種の恐ろしい実験なのでしょうか?

助けてくれてありがとう!

0 投票する
1 に答える
1292 参照

python - AMQP でのメッセージ ルーティング

AMQP でルーティング マジックを実行したいと思います。私のセットアップは、コンシューマー/プロデューサー側に Pika を使用した Python と AMQP サーバー用の RabbitMQ です。

私が達成したいこと:

  1. 単一の取引所にメッセージを送信する
  2. (ここに魔法を入れる)
  3. 次のようにメッセージを消費します。

    • サブスクライバーの 1 つのセットは、ルーティング キーに基づいて取得できる必要があります。
    • 1 組のサブスクライバーがすべてのメッセージを取得する必要があります。

      注意が必要なのは、2 番目のセットのいずれかのサーバーがメッセージを受信した場合、2 番目のセットの他のサーバーはメッセージを受信しないことです。最初のセットのすべてのサーバーは、引き続きこのメッセージを消費できるはずです。

これは単一のbasic_publish呼び出しで可能ですか、それともメッセージをルーティング交換 (最初のコンシューマー セットの場合)"グローバル" エクスチェンジ (コンシューマーの 2 番目のセットの場合) に送信する必要がありますか?

説明:

私が達成したいのは、メッセージを発行するための単一の呼び出しであり、2 つの異なるコンシューマー セットによって受信されます。

ケース 1 : ルーティング キーに基づいてメッセージを受信するだけです (つまり、ルーティング キーを含むメッセージは、foo現在そのトピックに関心のあるすべてのコンシューマーによって受信されます)。

ケース 2 : これは基本的にワーカー キューの RabbitMQ チュートリアルに似ています。ラウンド ロビン方式でディスパッチされたメッセージを受信する多くのワーカーがあります。1 人のワーカーのみがメッセージを受信します

それでも、特定のルーティング キーに関心のあるコンシューマーが受信するメッセージは、単一の API 呼び出しによって生成された、ワーカーが受信するメッセージとまったく同じである必要があります。

(私の質問が理にかなっていることを願っています。私は AMQP の用語にあまり詳しくありません)

0 投票する
1 に答える
1723 参照

python - pika の connection.ioloop.start() をデーモンで実行できますか?

rabbitmq からのメッセージを処理するためのワーカー デーモンをセットアップしようとしています。私はピカとそのSelectConnectionを使用しています。デーモンとして実行しない場合、コードは正常に動作します。使うことができます

成功しました。ただし、追加すると

コードは例外を発生させませんが、キューからのメッセージのフェッチを停止し、CPU 使用率を最大にします。worker.py はこの例とまったく同じです。

ありがとう。

0 投票する
3 に答える
63638 参照

rabbitmq - 自分の接続以外のチャネルから未確認の AMQP メッセージを回復するにはどうすればよいですか?

rabbitmq サーバーを実行し続ける時間が長ければ長いほど、未確認メッセージの問題が発生するようです。私はそれらを再び待ち行列に入れたいと思っています。実際、これを行う amqp コマンドがあるようですが、接続が使用しているチャネルにのみ適用されます。少なくとも試してみるために小さな pika スクリプトを作成しましたが、何かが足りないか、この方法では実行できません (rabbitmqctl ではどうですか?)

0 投票する
3 に答える
5920 参照

python - Python、Pika、および AMQP を使用して非同期 RPC アプリケーションを設計するための最適なパターンは何ですか?

私のアプリケーションのプロデューサー モジュールは、小さなクラスターで実行する作業を送信したいユーザーによって実行されます。RabbitMQ メッセージ ブローカーを介して JSON 形式でサブスクリプションを送信します。

私はいくつかの戦略を試しましたが、これまでのところ最良の方法は次のとおりですが、まだ完全には機能していません。

各クラスター マシンはコンシューマー モジュールを実行します。このモジュールは AMQP キューにサブスクライブし、prefetch_countを発行して、一度に実行できるタスクの数をブローカーに伝えます。

Pika AMQP ライブラリの SelectConnection を使用して動作させることができました。コンシューマーとプロデューサーの両方が 2 つのチャネルを開始し、1 つは各キューに接続されます。プロデューサはチャネル [A] でリクエストを送信し、チャネル [B] で応答を待ち、コンシューマはチャネル [A] でリクエストを待ち、チャネル [B] で応答を送信します。ただし、コンシューマが応答を計算するコールバックを実行するとブロックされるように見えるため、各コンシューマで一度に実行されるタスクは 1 つだけです。

最後に必要なもの:

  1. コンシューマー [A] は自分のタスク (毎回約 5k) をクラスターにサブスクライブします。
  2. ブローカは各コンシューマに対して N 個のメッセージ/リクエストをディスパッチします。ここで、N は処理できる同時タスクの数です。
  3. 1 つのタスクが完了すると、コンシューマーはブローカー/プロデューサーに結果を返信します。
  4. プロデューサーは応答を受け取り、計算ステータスを更新し、最後にいくつかのレポートを出力します

制限:

  • 別のユーザーが作業を送信すると、そのユーザーのすべてのタスクが前のユーザーの後にキューに入れられます (これはキュー システムから自動的に当てはまると思いますが、スレッド環境での影響については考えていません)。
  • タスクには提出する順序がありますが、回答する順序は重要ではありません

アップデート

私はもう少し勉強しましたが、私の実際の問題は、ピカの SelectConnection.channel.basic_consume() 関数へのコールバックとして単純な関数を使用しているようです。私の最後の (未実装の) アイデアは、通常の関数の代わりにスレッド関数を渡すことです。これにより、コールバックがブロックされず、消費者がリッスンし続けることができます。

0 投票する
3 に答える
7369 参照

python - RabbitMQ/pika でプライオリティ キューを実装する方法

RabbitMQ でプライオリティ キューを実装しようとしています。メーリング リストでは、複数のキューを使用することを推奨しています。各キューは異なる優先度レベルを表しています。

私の質問は、pika (またはおそらく他の python ライブラリ) を使用して、優先順位の高い順序で複数のキューをどのようにポーリングするのですか?

0 投票する
2 に答える
7451 参照

python - Pika ioloop async(RabbitMQ)のタイムアウトを設定する

Pika ioloopで働く消費者(労働者)を優雅に止めることができる必要があります。作業者は60秒後に停止する必要があります。現在処理中のメッセージは終了する必要があります。

コールバック関数の中に入れようとしましたconnection.close()が、それは現在のスレッドを停止するだけで、完全なioloopを停止しませんでした。そして、それはひどいエラー出力を与えました。

私のコードの16行目以降を参照してください:(Pika ioloop http://pika.github.com/connecting.html#cps-exampleに関する基本的な例:

0 投票する
2 に答える
2565 参照

python - RabbitMQ:キューのパージ

私はいくつかのキューを持っています、などのために:

現時点では、このキュー内のすべてのコンテンツをフラッシュする必要があります。ただし、現時点では、おそらく別のプロセスがこのキューに公開される可能性があります。channel.queue_purge(queue ='online')を使用すると、queue_purgeが機能している間に、公開されたメッセージはどうなりますか?

0 投票する
1 に答える
4017 参照

python - いつrabbitmqはtcpバックプレッシャを使用しますか?

Pikaのドキュメントによると、「RabbitMQブローカーは、メッセージの配信速度が速すぎる場合、TCPバックプレッシャを使用してクライアントの速度を低下させます。」バックプレッシャコールバックを登録しましたが、まだ呼び出されていません。私のキューには4000万を超えるメッセージがあり、増え続けています。背圧乗数を-1に設定することで、メッセージの公開ごとにコールバックが呼び出されるようになりますが、これはデバッグにのみ役立ちます。

0 投票する
1 に答える
1733 参照

python - Rabbitmqピカ自動再接続

pika.SelectConnection を使用して RabbitMq サーバーと通信するスクリプトがいくつかあります。

スクリプトの動作中にサーバーがダウンした場合に備えて、それらのスクリプトを自動的に rmq サーバーに再接続しようとする方法はありますか?