このプロジェクトでは、Storm と Trident を試しており、そのために Clojure と Marceline を使用しています。Marceline のページにある wordcount の例を拡張して、文のスパウトがローカルのスパウトではなく DRPC 呼び出しから来るようにしようとしています。DRPC ストリームがクライアントに返すには結果が必要であるという事実に起因すると思われる問題がありますが、DRPC 呼び出しが効果的に null を返し、永続化されたデータを更新するだけにしたいと考えています。
(defn build-topology
([]
(let [trident-topology (TridentTopology.)]
(let [
;; ### Two alternatives here ###
;collect-stream (t/new-stream trident-topology "words" (mk-fixed-batch-spout 3))
collect-stream (t/drpc-stream trident-topology "words")
]
(-> collect-stream
(t/group-by ["args"])
(t/persistent-aggregate (MemoryMapState$Factory.)
["args"]
count-words
["count"]))
(.build trident-topology)))))
コードには 2 つの代替方法があります。固定バッチ スパウトを使用する方法は問題なくロードされますが、代わりに DRPC ストリームを使用してコードをロードしようとすると、次のエラーが発生します。
InvalidTopologyException(msg:Component: [b-2] subscribes from non-existent component [$mastercoord-bg0])
このエラーは、クライアントに何かを返すために DRPC ストリームが出力をサブスクライブしようとしている必要があるという事実から来ていると思いますが、サブスクライブするpersistent-aggregate
そのような出力は提供しません。
では、DRPC ストリームによって永続化されたデータが更新されるようにトポロジを設定するにはどうすればよいでしょうか?
マイナーアップデート:これは不可能なようです:( https://issues.apache.org/jira/browse/STORM-38