1

次のことを行うアクターまたはエージェント(非同期的に更新され、調整されていない参照)のようなものを実装するためのclojureの最良の方法は何ですか?

  • 送信されたメッセージ/データを取得します
  • そのデータに対して何らかの関数を実行して、新しい状態を取得します。何かのようなもの(fn [state new-msgs] ...)
  • その更新中にメッセージ/データを受信し続けます
  • その更新が完了すると、その間に送信されたすべてのメッセージに対して同じ更新機能を実行します

エージェントはここでは適切ではないようです。機能とエージェントへのデータを同時に処理する必要があり、最後の更新中に入ってきたすべてのsendデータを操作する機能の余地がありません。この目標には、機能とデータの分離が暗示的に必要です。

アクター モデルは、関数とデータが切り離されているという点で、一般的に適しているようです。ただし、私が認識しているすべてのアクター フレームワークは、送信された各メッセージが個別に処理されることを想定しているようです。追加の機械を追加せずに、これをどのように頭に入れるかは明らかではありません。Pulsar のアクターは、アクターに「特別なトリック」を実行させるために使用できる関数を受け入れていることは知って:lifecycle-handleいますが、これに関するドキュメントはあまりないため、その機能が役立つかどうかは不明です。

エージェント、チャネル、および関数を使用てこの問題を解決する方法はありますが、少し面倒です。より良い解決策があることを願っています。他の人が役立つと思う場合に備えて、解決策として投稿しますが、他の人が何を思いついたのか見てみたい.core.asyncwatch

4

3 に答える 3

1

エージェントは、ここで必要なものの逆です。エージェントは、更新機能を送信する値です。キューとスレッドを使用すると、これが最も簡単になります。便宜上future、スレッドの作成に使用しています。

user> (def q (java.util.concurrent.LinkedBlockingDeque.)) 
#'user/q
user> (defn accumulate
        [summary input]
        (let [{vowels true consonents false}
              (group-by #(contains? (set "aeiouAEIOU") %) input)]
          (-> summary
            (update-in [:vowels] + (count vowels))
            (update-in [:consonents] + (count consonents)))))
#'user/accumulate
user> (def worker
           (future (loop [summary {:vowels 0 :consonents 0} in-string (.take q)]
                         (if (not in-string)
                             summary
                           (recur (accumulate summary in-string)
                                  (.take q))))))
#'user/worker
user> (.add q "hello")
true
user> (.add q "goodbye")
true
user> (.add q false)
true
user> @worker
{:vowels 5, :consonents 7}
于 2014-10-15T21:02:48.853 に答える
1

エージェント、core.async チャネル、watch 関数を使用して思いついたソリューションを次に示します。繰り返しますが、少し面倒ですが、今のところ必要なことは実行しています。大まかに言えば、次のとおりです。

(require '[clojure.core.async :as async :refer [>!! <!! >! <! chan go]])

; We'll call this thing a queued-agent
(defprotocol IQueuedAgent
  (enqueue [this message])
  (ping [this]))

(defrecord QueuedAgent [agent queue]
  IQueuedAgent
  (enqueue [_ message]
    (go (>! queue message)))
  (ping [_]
    (send agent identity)))


; Need a function for draining a core async channel of all messages
(defn drain! [c]
  (let [cc (chan 1)]
    (go (>! cc ::queue-empty))
    (letfn
      ; This fn does all the hard work, but closes over cc to avoid reconstruction
      [(drainer! [c]
         (let [[v _] (<!! (go (async/alts! [c cc] :priority true)))]
           (if (= v ::queue-empty)
             (lazy-seq [])
             (lazy-seq (cons v (drainer! c))))))]
      (drainer! c))))

; Constructor function
(defn queued-agent [& {:keys [buffer update-fn init-fn error-handler-builder] :or {:buffer 100}}]
  (let [q                (chan buffer)
        a                (agent (if init-fn (init-fn) {}))
        error-handler-fn (error-handler-builder q a)]
    ; Set up the queue, and watcher which runs the update function when there is new data
    (add-watch
      a
      :update-conv
      (fn [k r o n]
        (let [queued (drain! q)]
          (when-not (empty? queued)
            (send a update-fn queued error-handler-fn)))))
    (QueuedAgent. a q)))

; Now we can use these like this

(def a (queued-agent
         :init-fn   (fn [] {:some "initial value"})
         :update-fn (fn [a queued-data error-handler-fn]
                      (println "Receiving data" queued-data)
                      ; Simulate some work/load on data
                      (Thread/sleep 2000)
                      (println "Done with work; ready to queue more up!"))
         ; This is a little warty at the moment, but closing over the queue and agent lets you requeue work on
         ; failure so you can try again.
         :error-handler-builder
                    (fn [q a] (println "do something with errors"))))

(defn -main []
  (doseq [i (range 10)]
    (enqueue a (str "data" i))
    (Thread/sleep 500) ; simulate things happening
    ; This part stinks... have to manually let the queued agent know that we've queued some things up for it
    (ping a)))

お気づきのように、新しいデータが追加されるたびにここで queued-agent に ping を実行しなければならないのはかなり面倒です。物事が通常の使用法からねじれているように感じます.

于 2014-10-15T00:51:28.290 に答える
0

ティム・ボールドリッジの俳優のキャスト (エピソード 16) に触発されて、俳優により近いものを思いつきました。これにより、問題がより明確に解決されると思います。

(defmacro take-all! [c]
  `(loop [acc# []]
     (let [[v# ~c] (alts! [~c] :default nil)]
       (if (not= ~c :default)
         (recur (conj acc# v#))
         acc#))))


(defn eager-actor [f]
  (let [msgbox (chan 1024)]
    (go (loop [f f]
          (let [first-msg (<! msgbox) ; do this so we park efficiently, and only
                                      ; run when there are actually messages
                msgs      (take-all! msgbox)
                msgs      (concat [first-msg] msgs)]
            (recur (f msgs)))))
    msgbox))


(let [a (eager-actor (fn f [ms]
                       (Thread/sleep 1000) ; simulate work
                       (println "doing something with" ms)
                       f))]
  (doseq [i (range 20)]
    (Thread/sleep 300)
    (put! a i)))
;; =>
;; doing something with (0)
;; doing something with (1 2 3)
;; doing something with (4 5 6)
;; doing something with (7 8 9 10)
;; doing something with (11 12 13)
于 2014-11-18T08:35:08.903 に答える