0

コンテキスト: Clojure + RabbitMQ (Langohr 経由)、この質問のフォローアップ。

RabbitMQ mq からメッセージを消費すると、奇妙な結果が得られます (直接交換からメッセージを取得し、メッセージの処理後にファンアウト交換に発行します)。消費中にメッセージが別のスレッドになる理由がわかりません(いくつかのメッセージごとにスレッドの切り替えが発生します)。

コンシューマーは別のスレッドで開始します (IO 例外が発生した場合にメイン スレッドがクラッシュするのを防ぐため) が、それでは切り替えが説明されません。

; Message handler
(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        content (string/join " " (map msg '("title" "link" "body")))
        tags (pluck-tags content)]
    (println (format "HANDLER %s: Message: %s | found tags: %s"
                     (Thread/currentThread)
                     (msg "title")
                     (tags-to-csv tags)))
  (nil)))
  ; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))


(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))

メッセージ ハンドラーは、現在のスレッドと受信したペイロードを出力するだけです。これは私が得るものです:

HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...

ノート

エージェントと遊んでいるときにこれに気づきました。各メッセージを独自の CPU バインド スレッド プールで処理し、それを無制限 (IO) スレッド プールで公開したいと考えていました。しかし、現在のスレッドを出力した後、エージェント (または先物) を使用しなくても、メッセージが異なるスレッドによって処理されることに気付きました。

4

2 に答える 2

1

1)そこにファンアウト交換があります。つまり、メッセージのルーティング中にルーティングキーはまったく使用されません。ファンアウト交換は、バインドされたすべてのキューにメッセージをルーティングします。ルーティング キーを使用する場合は、直接交換またはトピック交換を使用してください。

2) 常に同じキュー名を使用します。つまり、コードが行っていることは、複数のコンシューマーを同じキューに追加することです。これは、rabbitmq が消費者の周囲でメッセージをラウンドロビンするだけであることを意味します。

于 2012-10-13T21:42:11.727 に答える
0

ランゴールの作者はこちら。

コードに欠けているものがあるはずです。エージェントでこの出力を取得する場合、それは簡単です。Clojure エージェント (また、先物とプロミス) はスレッド プールを使用します。Langohr の langohr.consumers/subscribe または RabbitMQ Java クライアントの基礎となる QueueingConsumer はそうではありません。

于 2012-10-13T21:41:24.880 に答える