5

ベクトルに保持されている (未評価の) 式が多数あります。[ expr1 expr2 expr3 ... ]

私がやりたいことは、各式を別々のスレッドに渡し、いずれかが値を返すまで待つことです。その時点で、他のスレッドからの結果には興味がなく、CPU リソースを節約するためにそれらをキャンセルしたいと考えています。

(これは、プログラムの異なる実行によって異なる式が最初に評価される可能性があるという点で、非決定論を引き起こす可能性があることを認識しています。私はこれを手元に持っています。)

上記を達成するための標準的/慣用的な方法はありますか?

4

5 に答える 5

5

これが私の見解です。

基本的に、各先物内でグローバルな promise を解決する必要があります。次に、future リストと解決された値を含むベクトルを返し、リスト内のすべての先物をキャンセルします。

(defn run-and-cancel [& expr]
    (let [p (promise)
          run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
          [fs res] (apply run-futures expr)]
        (map future-cancel fs)
        res))
于 2013-08-20T07:04:05.780 に答える
2

まだ公式リリースには達していませんcore.asyncが、あなたの問題やその他の非同期の問題を非常にきれいに解決する興味深い方法のようです。

のライニンゲンの呪文core.asyncは(現在)次のとおりです。

[org.clojure/core.async "0.1.0-SNAPSHOT"]

そして、ここにいくつかの時間のかかる関数を取り、そのうちの 1 つが戻るまでブロックする関数を作成するコードがあります。

(require '[clojure.core.async :refer [>!! chan alts!! thread]]))     

(defn return-first [& ops]
  (let [v (map vector ops (repeatedly chan))]
    (doseq [[op c] v]
      (thread (>!! c (op))))
    (let [[value channel] (alts!! (map second v))]
         value)))

;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
                         (fn [] (Thread/sleep 2000) 2000)
                         (fn [] (Thread/sleep 5000) 5000))
            2000))

上記のサンプルでは:

  • chan非同期チャネルを作成します
  • >!!チャネルに値を設定します
  • thread本体を別のスレッドで実行する
  • alts!!チャンネルのベクトルを取り、それらのいずれかに値が現れると戻ります

これ以上のことがあり、私はまだ頭を悩ませていますが、ここにウォークスルーがあります: https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

また、David Nolen のブログには、気が遠くなるほど素晴らしい投稿がいくつかあります ( http://swannodette.github.io/ ) 。

編集

Michał Marczyk が非常によく似た質問に答えているのを見たところですが、こちらの方が優れており、キャンセル/短絡が可能です。 Clojure を使用して長時間実行されるプロセスをスレッド化し、それらのリターンを比較する

于 2013-08-19T15:20:19.977 に答える
1

必要なのは Java のCompletionServiceです。これを clojure で包むラッパーを私は知りませんが、相互運用を行うのは難しくありません。以下の例は、 ExecutorCompletionServiceの JavaDoc ページの例に大まかに基づいています。

(defn f [col] 
    (let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
          futures (map #(.submit cs %) col)
          result (.get (.take cs))]
        (map #(.cancel % true) futures)
        result))
于 2013-08-19T14:58:00.643 に答える
0

future-call を使用してすべての先物のリストを取得し、それらを Atom に格納できます。次に、実行中の各未来を「頭の中で他のものを撃つ」機能で構成し、最初の未来が残りのすべてを終了するようにします。例を次に示します。

(defn first-out [& fns]
  (let [fs (atom [])
        terminate (fn [] (println "cancling..") (doall (map future-cancel @fs)))]
    (reset! fs (doall (map (fn [x] (future-call #((x) (terminate)))) fns)))))

(defn wait-for [n s]
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush)))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))

編集

前のコードは最初の結果を返さないことに気付きました。そのため、主に副作用に役立ちます。promise を使用して最初の結果を返す別のバージョンを次に示します。

(defn first-out [& fns]
  (let [fs (atom [])
        ret (promise)
        terminate (fn [x] (println "cancling.." ) 
                          (doall (map future-cancel @fs))
                          (deliver ret x))]
    (reset! fs (doall (map (fn [x] (future-call #(terminate (x)))) fns)))
    @ret))

(defn wait-for [n s]
  "this time, return the value"
  (fn [] (print "start...") (flush) (Thread/sleep n) (print s) (flush) s))

(first-out (wait-for 1000 "long") (wait-for 500 "short"))
于 2013-08-19T23:59:05.780 に答える
-1

あなたの目標を達成するための慣用的な方法があるかどうかはわかりませんが、Clojure Futureはぴったりのようです。

式の本体を取り、別のスレッドで本体を呼び出し、結果をキャッシュして deref/@ への後続のすべての呼び出しで返すフューチャー オブジェクトを生成します。計算がまだ終了していない場合、deref/@ の呼び出しは、タイムアウト付きの deref のバリアントが使用されない限り、ブロックされます。

于 2013-08-19T13:41:06.533 に答える