9

Clojure で core.async コードを書き、それを実行すると、利用可能なすべてのメモリが消費され、エラーで失敗しました。mapcatcore.async パイプラインで使用すると、プレッシャーが解消されるようです。(これは、この質問の範囲を超えた理由で残念です。)

以下は、 ing トランスデューサ:xの出入りをカウントすることによって問題を示すコードです。mapcat

(ns mapcat.core
  (:require [clojure.core.async :as async]))

(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x))))))

=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000

生産者は消費者よりはるかに先を行っています。

これを発見したのは私が初めてではないようです。しかし、ここで与えられた説明はそれをカバーしていないようです。(ただし、適切な回避策は提供されます。) 概念的には、プロデューサーが先行していると思いますが、チャネルにバッファリングされる可能性のあるいくつかのメッセージの長さだけです。

私の質問は、他のすべてのメッセージはどこにありますか? 出力の 4 行目までに、7000:x秒が不明です。

4

1 に答える 1

2

UPDATE 2020-01-14: メモリ リークが修正されました。

「メモリ リークはどこにあるのか」という質問には、2 つの解釈が可能です。

まず、データはどこに保持されますか?答えは、展開変換のすぐ下流のチャネル バッファーにあるようです。

チャネルはデフォルトでFixedBuffer( clojure.core.async.impl.buffers/FixedBuffer ) を使用します。これは、チャネルがいっぱいかどうかを判断できますが、いっぱいになっても問題ありません。

次に、バッファがいっぱいになる原因となるコードはどれですか? これは (間違っていたら訂正してください) ( clojure.core.async.impl.channels/ManyToManyChannel )take!メソッドにあるようです。ここでは、呼び出しが行われる前にバッファーに対する最初の呼び出し行われます。ManyToManyChanneladd!full?

take!削除するアイテムごとに、少なくとも1つのアイテムをバッファーに追加できると想定しているようです。このような長期にわたる拡張トランスデューサの場合、mapcatこれは必ずしも安全な仮定ではありません。

この行(when (and (.hasNext iter) (not (impl/full? buf)))core.async のローカル コピーに変更することで、質問のコードを期待どおりに動作させることができます。(NB 私の core.async の理解は、これがユースケースの堅牢なソリューションであることを保証するには不十分です )

UPDATE 2016-09-17: これには問題があります: http://dev.clojure.org/jira/browse/ASYNC-178

UPDATE 2020-01-14: これは現在修正されています: https://clojure.atlassian.net/browse/ASYNC-210 (ただし、以前のチケットは「拒否」としてクローズされました)

于 2016-06-27T15:56:02.937 に答える