17

私はClojureでマルチスレッドをまったく使用していないので、どこから始めればよいかわかりません。

私はdoseqその体が並行して走ることができる人を持っています。私が望んでいるのは、範囲がなくなるまでボディを並行して評価する3つのスレッドが常に実行されている(1つのコアを空けたままにする)ことです。共有状態はなく、複雑なものもありません。Pythonのマルチプロセッシングに相当するもので十分です。

だから次のようなもの:

(dopar 3 [i (range 100)]
  ; repeated 100 times in 3 parallel threads...
  ...)

どこから始めればいいですか?これに対するコマンドはありますか?標準パッケージ?良い参考資料ですか?

これまでのところpmap、私はそれを使用できます(一度に3つに制限するにはどうすればよいですか?一度に32を使用するように見えます-いいえ、ソースは2 +プロセッサの数を示しています)が、これは基本的なプリミティブのようですそれはすでにどこかに存在しているはずです。

明確化:私は本当にスレッドの数を制御したいと思います。私は長時間実行され、かなりの量のメモリを使用するプロセスを持っているので、多数を作成してうまくいくことを期待するのは良いアプローチではありません(かなりのチャンクが利用可能なmemを使用する例)。

更新:これを行うマクロの作成を開始しました。セマフォ(またはミューテックス、または待機できるアトム)が必要です。セマフォはClojureに存在しますか?または、ThreadPoolExecutorを使用する必要がありますか?Javaから多くを引き込まなければならないのは奇妙に思えます-Clojureでの並列プログラミングは簡単なはずだと思っていました...多分私はこれを完全に間違った方法で考えているのでしょうか?うーん。エージェント?

4

7 に答える 7

6

OK、私が欲しいのは、for eachループを作成し、データをを使用してエージェントに送信することだと思います。を使用してトリガーされたエージェントはスレッドプールから実行されるため、数は何らかの方法で制限されます(正確に3つのスレッドをきめ細かく制御することはできませんが、今のところは実行する必要があります)。agentsendsend

[Dave Rayはコメントで説明しています:プールサイズを制御するには、自分で作成する必要があります]

(defmacro dopar [seq-expr & body]
  (assert (= 2 (count seq-expr)) "single pair of forms in sequence expression")
  (let [[k v] seq-expr]
    `(apply await
       (for [k# ~v]
         (let [a# (agent k#)]
           (send a# (fn [~k] ~@body))
         a#)))))

これは次のように使用できます:

(deftest test-dump
  (dopar [n (range 7 11)]
    (time (do-dump-single "/tmp/single" "a" n 10000000))))

わーい!動作します!揺れる!(OK、Clojureも少し揺れます)。 関連するブログ投稿

于 2012-06-10T17:18:41.533 に答える
5

pmapほとんどの状況で実際に正常に動作します-マシンに適切な数のスレッドを持つスレッドプールを使用します。デフォルトが問題を引き起こしているという実際のベンチマークの証拠がない限り、スレッドの数を制御するための独自のメカニズムを作成しようとはしません。

そうは言っても、本当に最大3つのスレッドに制限したい場合、簡単なアプローチは、範囲の3つのサブセットでpmapを使用することです。

(defn split-equally [num coll] 
  "Split a collection into a vector of (as close as possible) equally sized parts"
  (loop [num num 
         parts []
         coll coll
         c (count coll)]
    (if (<= num 0)
      parts
      (let [t (quot (+ c num -1) num)]
        (recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t)))))) 

(defmacro dopar [thread-count [sym coll] & body]
 `(doall (pmap 
    (fn [vals#]
      (doseq [~sym vals#]
        ~@body))  
    (split-equally ~thread-count ~coll))))

(怠惰な)doallの評価を強制するために必要な、の使用に注意してください。pmap

于 2012-06-11T03:25:38.560 に答える
5

まさにこれを行うためのライブラリが実際にあります。彼らからgithub

pmapクレイプールライブラリは、、、、などのClojure関数のスレッドプールベースの並列バージョンを提供しfutureますfor

これは、同じものに対して順序付けされたバージョンと順序付けられていないバージョンの両方を提供します。

于 2017-08-30T15:34:31.993 に答える
4

pmapだけを使ってみませんか?それでもスレッドプールを制御することはできませんが、エージェントを使用するカスタムマクロを作成するよりもはるかに少ない作業です(なぜfuturesではないのですか?)。

于 2012-06-10T19:05:35.753 に答える
4

次の要件で同様の問題が発生しました。

  1. 使用するスレッドの数を制御します。
  2. スレッドプールの管理については不可知論者です。
  3. タスクの順序を維持する必要はありません。
  4. タスクの処理時間は異なる可能性があるため、タスクの順序を維持する必要はありませんが、早く終了したタスクは早く返される必要があります。
  5. 入力シーケンスを怠惰に評価して送信します。
  6. 入力シーケンスの要素は、範囲外で読み取られるべきではありませんが、メモリ不足の問題を回避するために、バッファリングされ、返された結果に沿って読み取られる必要があります。

コアpmap関数は、最後の2つの仮定のみを満たします。

ExecutorServiceこれは、標準のJavaスレッドプールとCompletionService入力ストリームのパーティション化を使用して、これらの仮定を満たしている実装です。

(require '[clojure.tools.logging :as log])

(import [java.util.concurrent ExecutorService ExecutorCompletionService 
                              CompletionService Future])

(defn take-seq
  [^CompletionService pool]
  (lazy-seq
   (let [^Future result (.take pool)]
     (cons (.get result)
           (take-seq pool)))))

(defn qmap
  [^ExecutorService pool chunk-size f coll]
  (let [worker (ExecutorCompletionService. pool)]
    (mapcat
     (fn [chunk]
       (let [actual-size (atom 0)]
         (log/debug "Submitting payload for processing")
         (doseq [item chunk]
           (.submit worker #(f item))
           (swap! actual-size inc))
         (log/debug "Outputting completed results for" @actual-size "trades")
         (take @actual-size (take-seq worker))))
     (partition-all chunk-size coll))))

ご覧のとおりqmap、スレッドプール自体はインスタンス化されませんが、。のみがインスタンス化されますExecutorCompletionService。これにより、たとえば、固定サイズで渡すことができますThreadPoolExecutorService。また、qmap遅延シーケンスを返すため、スレッドプールリソース自体を管理することはできず、管理してはなりません。最後に、chunk-sizeは、入力シーケンスの要素の数を制限して、一度にタスクとして実現および送信できるようにします。

以下のコードは、適切な使用法を示しています。

(import [java.util.concurrent Executors])

(let [thread-pool (Executors/newFixedThreadPool 3)]
  (try
    (doseq [result (qmap thread-pool
                         ;; submit no more than 500 tasks at once
                         500 
                         long-running-resource-intensive-fn
                         unboundedly-large-lazy-input-coll)]
      (println result))
    (finally
      ;; (.shutdown) only prohibits submitting new tasks,
      ;; (.shutdownNow) will even cancel already submitted tasks.
      (.shutdownNow thread-pool))))

使用されているJava同時実行クラスのいくつかのドキュメントは次のとおりです。

于 2013-10-30T16:19:14.233 に答える
2

私はまだClojureの初心者なので、慣用的なものかどうかはわかりませんが、次の解決策は私にとってはうまくいき、かなり簡潔に見えます。

(let [number-of-threads 3
      await-timeout 1000]
  (doseq [p-items (partition number-of-threads items)]
    (let [agents (map agent p-items)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
于 2012-06-19T03:43:16.593 に答える
0

パイプラインとチャネルを使用します。操作がIOバウンドである場合、pmapのプールはCPUの量にバインドされるため、これは望ましいオプションです。

もう1つの良いオプションは、下にあるcachedThredPoolExecutorを使用する見送りと一緒にエージェントを使用することです。

于 2018-05-28T11:13:02.433 に答える