10

Clojureアプリケーションを使用してWebAPIからデータにアクセスしています。たくさんのリクエストをすることになりますが、リクエストの多くはより多くのリクエストが行われることにつながるので、リクエストのURLをキューに入れて、その後のダウンロードの間隔を60秒にしておきたいと思います。

このブログ投稿に続いて、私はこれをまとめました:

(def queue-delay (* 1000 60)) ; one minute

(defn offer!
  [q x]
  (.offerLast q x)
  q)

(defn take!
  [q]
  (.takeFirst q))

(def my-queue (java.util.concurrent.LinkedBlockingDeque.))

(defn- process-queue-item
  [item]
  (println ">> " item)   ; this would be replaced by downloading `item`
  (Thread/sleep queue-delay))

(future (process-queue-item (take! my-queue)))コードのどこかにを含めると、REPLでできる(offer! my-queue "something")ので、すぐに「>>何か」が印刷されます。ここまでは順調ですね!しかし、プログラムがアクティブである間ずっとキューを持続させる必要があります。今述べた(future ...)呼び出しは、キューから1つのアイテムが利用可能になるとそれを引き出すために機能しますが、キューを継続的に監視し、process-queue-item何かが利用可能になるたびに呼び出すものが必要です。

また、並行性に対する通常のClojureの愛情とは異なり、一度に1つの要求のみが行われ、プログラムが後続の各要求を行うために60秒待機するようにします。

このStackOverflowの質問は適切だと思いますが、自分が望むことを実行するためにそれをどのように適応させるかはわかりません。キューを継続的にポーリングして、一度に1つのリクエストのみが実行されていることを確認するにはどうすればよいですか?

4

3 に答える 3

3

これは、私が趣味で行ったプロジェクトのコード スニペットです。完璧ではありませんが、「最初のアイテムを 55 秒待つ」という問題をどのように回避したかがわかります。基本的には、Future を使用して処理をすぐに行うか、Promise が「利用可能になる」まで、Promise を循環します。

(defn ^:private process
  [queues]
  (loop [[q & qs :as q+qs] queues p (atom true)]
    (when-not (Thread/interrupted)
      (if (or
            (< (count (:promises @work-manager)) (:max-workers @work-manager))
            @p) ; blocks until a worker is available
        (if-let [job (dequeue q)]
          (let [f (future-call #(process-job job))]
            (recur queues (request-promise-from-work-manager)))
          (do
            (Thread/sleep 5000)
            (recur (if (nil? qs) queues qs) p)))
        (recur q+qs (request-promise-from-work-manager))))))

多分あなたは似たようなことをすることができますか?コードは良くなく、おそらく を使用するために書き直す必要があるかもしれませんがlazy-seq、それは私がまだ行っていない演習です!

于 2012-08-29T16:20:23.970 に答える
1

私は自分の小さなライブラリをローリングすることになりました。これをsimple-queueと呼びました。GitHubで完全なドキュメントを読むことができますが、ここにその全体のソースがあります。この回答を更新し続けるつもりはないので、このライブラリを使用したい場合は、GitHubからソースを入手してください。

(ns com.github.bdesham.simple-queue)

(defn new-queue
  "Creates a new queue. Each trigger from the timer will cause the function f
  to be invoked with the next item from the queue. The queue begins processing
  immediately, which in practice means that the first item to be added to the
  queue is processed immediately."
  [f & opts]
  (let [options (into {:delaytime 1}
                      (select-keys (apply hash-map opts) [:delaytime])),
        delaytime (:delaytime options),
        queue {:queue (java.util.concurrent.LinkedBlockingDeque.)},
        task (proxy [java.util.TimerTask] []
               (run []
                 (let [item (.takeFirst (:queue queue)),
                       value (:value item),
                       prom (:promise item)]
                   (if prom
                     (deliver prom (f value))
                     (f value))))),
        timer (java.util.Timer.)]
    (.schedule timer task 0 (int (* 1000 delaytime)))
    (assoc queue :timer timer)))

(defn cancel
  "Permanently stops execution of the queue. If a task is already executing
  then it proceeds unharmed."
  [queue]
  (.cancel (:timer queue)))

(defn process
  "Adds an item to the queue, blocking until it has been processed. Returns
  (f item)."
  [queue item]
  (let [prom (promise)]
    (.offerLast (:queue queue)
                {:value item,
                 :promise prom})
    @prom))

(defn add
  "Adds an item to the queue and returns immediately. The value of (f item) is
  discarded, so presumably f has side effects if you're using this."
  [queue item]
  (.offerLast (:queue queue)
              {:value item,
               :promise nil}))

このキューを使用して値を返す例:

(def url-queue (q/new-queue slurp :delaytime 30))
(def github (q/process url-queue "https://github.com"))
(def google (q/process url-queue "http://www.google.com"))

toの呼び出しはブロックされるため、2つのステートメントq/processの間に30秒の遅延が発生します。def

純粋に副作用のためにこのキューを使用する例:

(defn cache-url
  [{url :url, filename :filename}]
  (spit (java.io.File. filename)
        (slurp url)))

(def url-queue (q/new-queue cache-url :delaytime 30))
(q/add url-queue {:url "https://github.com",
                  :filename "github.html"})    ; returns immediately
(q/add url-queue {:url "https://google.com",
                  :filename "google.html"})    ; returns immediately

q/add今すぐ戻るための呼び出し。

于 2012-09-11T17:02:22.677 に答える
1

これはおそらく正気ではありませんが、このような関数をいつでも使用して、スローダウンした遅延シーケンスを作成できます。

(defn slow-seq [delay-ms coll]
  "Creates a lazy sequence with delays between each element"
  (lazy-seq 
    (if-let [s (seq coll)]
        (do 
          (Thread/sleep delay-ms)
          (cons (first s)
                (slow-seq delay-ms (rest s)))))))

これにより、基本的に各関数呼び出し間の遅延が保証されます。

次のようなもので使用して、ミリ秒単位の遅延を提供できます。

(doseq [i (slow-seq 500 (range 10))]
  (println (rand-int 10))

または、次のような関数呼び出しをシーケンス内に配置することもできます。

(take 10 (slow-seq 500 (repeatedly #(rand-int 10))))

明らかに、上記の両方で、(rand-int 10)ダウンロードの実行/トリガーに使用しているコードに置き換えることができます。

于 2012-09-04T07:52:03.330 に答える