問題タブ [langohr]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
multithreading - Clojure + RabbitMQ / マルチスレッド メッセージの消費
コンテキスト: Clojure + RabbitMQ (Langohr 経由)、この質問のフォローアップ。
RabbitMQ mq からメッセージを消費すると、奇妙な結果が得られます (直接交換からメッセージを取得し、メッセージの処理後にファンアウト交換に発行します)。消費中にメッセージが別のスレッドになる理由がわかりません(いくつかのメッセージごとにスレッドの切り替えが発生します)。
コンシューマーは別のスレッドで開始します (IO 例外が発生した場合にメイン スレッドがクラッシュするのを防ぐため) が、それでは切り替えが説明されません。
メッセージ ハンドラーは、現在のスレッドと受信したペイロードを出力するだけです。これは私が得るものです:
ノート
エージェントと遊んでいるときにこれに気づきました。各メッセージを独自の CPU バインド スレッド プールで処理し、それを無制限 (IO) スレッド プールで公開したいと考えていました。しかし、現在のスレッドを出力した後、エージェント (または先物) を使用しなくても、メッセージが異なるスレッドによって処理されることに気付きました。
clojure - clojureでlangohr RabbitMQ相互作用をスタブするにはどうすればよいですか?
私が書いているアプリケーションの主な目的ではないので、RabbitMQ インタラクションをスタブ化しようとしています。
そのため、テストで langohr 関数を次のように再バインドしてみました。
でテストを実行するとlein test
、
私は、さまざまなテストフレームワークを含む他のいくつかの方法を試して、langohr lib関数を再定義または再バインドしましたが、進歩はありませんでした。
私は他のシナリオをテストし、上記のコード構造で cheshire (json parsing clojure lib) 関数のスタブ化に成功しました。私のランゴールスタブが機能しない理由を理解し、エレガントな方法でこれを行う方法についてのヒントを求めて、謙虚に支援を求めます。
clojure - Langohr で RabbitMQ の再起動を許容する方法は?
Rabbit キューから読み取る Clojure コードがあります。再起動の場合など、RabbitMQ サーバーが一時的にダウンする場合を許容したいと考えています ( sudo service rabbitmq-server restart
)。
ランゴールで再接続するための規定があるようです。例を適応させましたclojurewerkz.langohr.examples.recovery.example1
( Gist here )。公開された例とのわずかな違いには、接続パラメーターとlb/publish
呼び出しの削除が含まれます (外部ソースでデータを入力しているため)。
キューからデータを正常に消費し、さらにメッセージを待つことができます。ただし、(RabbitMQ をホストしている VM で上記のコマンドを使用して) RMQ を再起動するsudo
と、次の例外がスローされます。
Langohr が提供する意図された再起動メカニズムは、起動時に壊れている可能性があります。これらの「ハード」再起動の場合に優先される代替パターンはありますか? または、接続監視を実装して自分で再試行する必要があると思います。どんな提案でも大歓迎です。
clojure - RabbitMQ サーバーの再起動時に、RabbitMQ Clojure Langohr がエラーをスローしない
データベーステーブルを監視して、数秒ごとに MQ メッセージを作成する次のコードがあります。そして、RabbitMQ サーバーを再起動しても、アプリは例外をスローせずに実行され、メッセージが作成されたままであることがわかりました。では、なぜ例外がスローされないのでしょうか。もう1つの質問は、アプリを強制終了したときに接続を閉じる方法ですか? サービスなので、RabbitMQ 接続を閉じるコードを書く場所がありません。
clojure - 一度に 1 つずつではなく、バッチでメッセージを配信するように RabbitMQ を取得する方法はありますか?
私は、Clojure と Langohr で RabbitMQ を使用しています。一度に 1 つずつではなく、バッチでキューからメッセージを処理したいと考えています。もちろん、メッセージがキューから取り出された後にメッセージを自分でバッチ処理することもできますが、RMQ が一度に 500 件のメッセージを一度に配信できるようにする API 呼び出しまたは設定が欠けているかどうかに興味があります。消費者。これは可能ですか?
clojure - Langohrを使用してRabbitMQでキューに入れられたメッセージを再配信する方法は?
バックグラウンド
Langohr (バージョン 3.4.0) で RabbitMQ からのメッセージを消費し、それらを MongoDB に永続化しようとします。auto-ack
MongoDB でメッセージを永続化できなかった場合、後で再試行できるようにするため、使用していません。これに対応するためにack-unless-exception
関数を使用しています。今夜、MongoDB は一時的に中断され、40 個のメッセージを永続化できず、RabbitMQ キューに保持された短い期間ダウンしました。しかし、MongoDB が再び起動したとき、Langohr ハンドラは新しいメッセージを受信しました。アプリケーションを再起動する前に、古いものは配信されませんでした。
質問
アプリケーションを再起動せずに、RabbitMQ に以前の nack:ed メッセージを Langohr で再配信させるにはどうすればよいでしょうか?
rabbitmq - 未配信メッセージを定期的に再送信するように RabbitMQ に指示する
バックグラウンド
RabbitMQ とやり取りするためにlangohrを使用しています。サービスによってまだ適切に処理されていないメッセージを RabbitMQ が再送信できるようにするために、2 つの異なるアプローチを試みました。動作する 1 つの方法は、basic.nack
withrequeue
セットを に送信true
することですが、サービスがbasic.ack
. たとえば、サービスが現在ダウンしている (そしてしばらくダウンしている) データストアにメッセージを永続化しようとすると、これは少し問題になります。未配信のメッセージを 20 秒ごとに取得する方がよいでしょう (つまりbasic.ack
、basic.nack
データストアがダウンしている場合はメッセージをキューに保持するだけです)。私たちはExecutorService
、要点が次のように実装されている を使用してこれを実装しようとしました:
残念ながら、これは機能していないようです (メッセージは再配信されず、キューに残るだけです)。サービスを再起動すると、キューに入れられたメッセージが正しく消費されます。ただし、 spring-rabbitmq (Java) を使用して実装されている他のサービスがあり、それらはそのままでこれを処理しているようです。ソースコードを調べて、彼らがどのようにそれを行うのかを調べてみましたが、まだうまくいきません.
質問
RabbitMQ に定期的に (できれば Langohr を使用して) キュー内のメッセージを (再) 配信するように指示するにはどうすればよいですか?
clojure - Langohr メッセージ ハンドラが clojure マップの amqp ヘッダーを変換しない
今のところ、ペイロードとヘッダーを読み取って印刷するだけのlangohrメッセージハンドラーがあります。問題は、交換でメッセージを公開するときです。ドキュメントに記載されているようにヘッダーを照会することも (ヘッダー "h1" を取得することも)、ネイティブの clojure マップを使用することもできません。ここで何が間違っていますか?
ヘッダー付きメッセージを公開する場合
ユーザー=> (パブリッシュ ch "some.topic.exchange" "key1" "{\"id\":1}" {:headers {"h1" "value"}}) nil
出力
ジョブステータスの更新: {:status {:id 1}, :metadata #object[java.util.HashMap 0x3cb4bf18 {h1=値}]}