次の要件で同様の問題が発生しました。
- 使用するスレッドの数を制御します。
- スレッドプールの管理については不可知論者です。
- タスクの順序を維持する必要はありません。
- タスクの処理時間は異なる可能性があるため、タスクの順序を維持する必要はありませんが、早く終了したタスクは早く返される必要があります。
- 入力シーケンスを怠惰に評価して送信します。
- 入力シーケンスの要素は、範囲外で読み取られるべきではありませんが、メモリ不足の問題を回避するために、バッファリングされ、返された結果に沿って読み取られる必要があります。
コアpmap
関数は、最後の2つの仮定のみを満たします。
ExecutorService
これは、標準のJavaスレッドプールとCompletionService
入力ストリームのパーティション化を使用して、これらの仮定を満たしている実装です。
(require '[clojure.tools.logging :as log])
(import [java.util.concurrent ExecutorService ExecutorCompletionService
CompletionService Future])
(defn take-seq
[^CompletionService pool]
(lazy-seq
(let [^Future result (.take pool)]
(cons (.get result)
(take-seq pool)))))
(defn qmap
[^ExecutorService pool chunk-size f coll]
(let [worker (ExecutorCompletionService. pool)]
(mapcat
(fn [chunk]
(let [actual-size (atom 0)]
(log/debug "Submitting payload for processing")
(doseq [item chunk]
(.submit worker #(f item))
(swap! actual-size inc))
(log/debug "Outputting completed results for" @actual-size "trades")
(take @actual-size (take-seq worker))))
(partition-all chunk-size coll))))
ご覧のとおりqmap
、スレッドプール自体はインスタンス化されませんが、。のみがインスタンス化されますExecutorCompletionService
。これにより、たとえば、固定サイズで渡すことができますThreadPoolExecutorService
。また、qmap
遅延シーケンスを返すため、スレッドプールリソース自体を管理することはできず、管理してはなりません。最後に、chunk-size
は、入力シーケンスの要素の数を制限して、一度にタスクとして実現および送信できるようにします。
以下のコードは、適切な使用法を示しています。
(import [java.util.concurrent Executors])
(let [thread-pool (Executors/newFixedThreadPool 3)]
(try
(doseq [result (qmap thread-pool
;; submit no more than 500 tasks at once
500
long-running-resource-intensive-fn
unboundedly-large-lazy-input-coll)]
(println result))
(finally
;; (.shutdown) only prohibits submitting new tasks,
;; (.shutdownNow) will even cancel already submitted tasks.
(.shutdownNow thread-pool))))
使用されているJava同時実行クラスのいくつかのドキュメントは次のとおりです。