5

私は小さなClojureコンシューマー/パブリッシャーがメッセージを受信し、それらを処理して他のコンシューマーに送信しています。これらはすべてRabbitMQを介して行われます。

別のスレッド(つまりメインスレッドとは別)でメッセージを処理するメッセージハンドラーを定義しました。以下のコードに見られるように、スレッドはメッセージを同期的に送受信します。これらはすべて、 lcm/subscribe関数によって開始されるイベントループで発生します。

したがって、問題は、これらの同期メッセージハンドラーのNサイズのスレッドプールを作成するための「Clojureの方法」は何でしょうか。Clojure以外の方法は、Java相互運用機能を介して手動で多数のスレッドを生成することだと思います。

また、処理にCPUをあまり使用しないことを考えると、メッセージ処理の速度はまったく向上しますか?処理よりも公開に多くの時間が費やされていることを考えると、これらのメッセージハンドラーを非同期にする方がよいでしょうか?

そして最後に、これらの競合するアプローチのパフォーマンスをどのように測定しますか(私はRuby / Javascriptの世界から来ており、そこにはマルチスレッドはありません)?

:水平方向にスケーリングし、メッセージバスをリッスンするJVMプロセスをさらに生成することで、これをすべて回避できることはわかっていますが、アプリはHerokuにデプロイされるため、各dynoでできるだけ多くのリソースを使用したいと思います。 /処理する。

(defn message-handler
  [ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message)))

(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name message-handler :auto-ack true))))))

より基本的な注意点として...このコードをリファクタリングして、次のような追加の引数を使用したメッセージハンドラーコールバックの登録をサポートするにはどうすればよいですか?

    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))

次に、参照を使用して公開します。

    (lb/publish ch pub-name "" processed-message)))

リテラルの代わりに:

    (lb/publish ch "e.events" "" processed-message)))
4

2 に答える 2

2

質問の2番目の部分では、以下に示すように部分適用を使用できます。

(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message)))



(.start 
  (Thread. 
     (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
于 2012-10-04T04:27:45.480 に答える
1

これは非常に大きなトピックであり、この質問をいくつかの別個の質問に分割することを検討するかもしれませんが、簡潔な答えは次のとおりですagents

于 2012-10-04T00:17:40.903 に答える