私は小さなClojureコンシューマー/パブリッシャーがメッセージを受信し、それらを処理して他のコンシューマーに送信しています。これらはすべてRabbitMQを介して行われます。
別のスレッド(つまりメインスレッドとは別)でメッセージを処理するメッセージハンドラーを定義しました。以下のコードに見られるように、スレッドはメッセージを同期的に送受信します。これらはすべて、 lcm/subscribe関数によって開始されるイベントループで発生します。
したがって、問題は、これらの同期メッセージハンドラーのNサイズのスレッドプールを作成するための「Clojureの方法」は何でしょうか。Clojure以外の方法は、Java相互運用機能を介して手動で多数のスレッドを生成することだと思います。
また、処理にCPUをあまり使用しないことを考えると、メッセージ処理の速度はまったく向上しますか?処理よりも公開に多くの時間が費やされていることを考えると、これらのメッセージハンドラーを非同期にする方がよいでしょうか?
そして最後に、これらの競合するアプローチのパフォーマンスをどのように測定しますか(私はRuby / Javascriptの世界から来ており、そこにはマルチスレッドはありません)?
注:水平方向にスケーリングし、メッセージバスをリッスンするJVMプロセスをさらに生成することで、これをすべて回避できることはわかっていますが、アプリはHerokuにデプロイされるため、各dynoでできるだけ多くのリソースを使用したいと思います。 /処理する。
(defn message-handler
[ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
processed-message (process msg)]
(lb/publish ch "e.events" "" processed-message)))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name message-handler :auto-ack true))))))
より基本的な注意点として...このコードをリファクタリングして、次のような追加の引数を使用したメッセージハンドラーコールバックの登録をサポートするにはどうすればよいですか?
(.start (Thread. (fn []
(lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))
次に、参照を使用して公開します。
(lb/publish ch pub-name "" processed-message)))
リテラルの代わりに:
(lb/publish ch "e.events" "" processed-message)))