バックグラウンド
質問に答えて、私はbounded-tchanを作成してアップロードしました( jnbのバージョンをアップロードするのは適切ではなかったでしょう)。名前が十分でない場合、bounded-tchan(BTChan)は、最大容量を持つSTMチャネルです(チャネルが容量に達した場合はブロックを書き込みます)。
最近、通常のTChanのようにdup機能を追加するようリクエストがありました。そして、このように問題が始まります。
BTChanの外観
BTChanの簡略化された(実際には機能しない)ビューを以下に示します。
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
チャネルに書き込むたびnrDups
に、タプルに重複の数()を含めます。これは、この要素を取得したリーダーの数を示す「個別要素カウンター」です。
すべてのリーダーは、読み取る要素のカウンターをデクリメントしてから、その読み取りポインターをリスト内の次の要素に移動します。リーダーがカウンターをゼロにcount
デクリメントすると、チャネルで使用可能な容量を適切に反映するためにの値がデクリメントされます。
必要なセマンティクスを明確にするために:チャネル容量は、チャネルでキューに入れられる要素の最大数を示します。各dupのリーダーが要素を受信するまで、任意の要素がキューに入れられます。GCed dupのキューに入れられたままの要素はありません(これが主な問題です)。
たとえば、容量が2のチャネル(c1、c2、c3)の重複が3つあるとします。ここで、2つのアイテムがチャネルに書き込まれ、次にすべてのアイテムがから読み取られc1
ますc2
。コピーを消費していないため、チャネルはまだいっぱいです(残りの容量は0) 。c3
へのすべての参照c3
が削除された場合(c3
GCも削除された場合)、いつでも容量を解放する必要があります(この場合は2に復元します)。
ここに問題があります: 私が次のコードを持っているとしましょう
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
BTChanを次のように見せます:
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
最後に、の読み取りカウント"hello"
がまだ残っていることに注意して1
ください。つまり、メッセージは(実際の実装ではGCされますが)なくなったとは見なされず、count
デクリメントされることはありません。チャネルは容量(最大1要素)にあるため、ライターは常にブロックします。
dupBTChan
が呼び出されるたびにファイナライザーを作成してほしい。複製された(または元の)チャネルが収集されると、そのチャネルで読み取られる残りのすべての要素が要素ごとのカウントをデクリメントし、nrDups
変数もデクリメントされます。その結果、将来の書き込みは正しくなりますcount
(count
GCされたチャネルによって読み取られない変数用のスペースを予約しません)。
解決策1-手動のリソース管理(避けたいこと)
このため、JNBのbounded-tchanには実際には手動のリソース管理があります。を参照してくださいcancelBTChan
。私は、ユーザーが間違えるのが難しいことを考えています(多くの場合、手動管理が正しい方法ではないというわけではありません)。
解決策2-TVarでブロックして例外を使用します(GHCはこれを希望どおりに実行できません)
このソリューションを編集すると、単なるスピンオフであるソリューション3は機能しません。バグ5055 ( WONTFIX )が原因で、GHCコンパイラはブロックされた両方のスレッドに例外を送信します(理論的には決定可能ですが、GHC GCでは実用的ではありません)。
aを取得するすべての方法BTChan
がIOである場合forkIO
、指定されたに固有の追加の(ダミーの)TVarフィールドで読み取り/再試行するスレッドを作成できますBTChan
。nrDups
新しいスレッドは、TVarへの他のすべての参照が削除されたときに例外をキャッチするため、および個々の要素カウンターをいつデクリメントするかを認識します。これは機能するはずですが、すべてのユーザーにIOを使用してBTChan
sを取得するように強制します。
data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO ()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
編集:はい、これは貧弱なファイナライザーであり、使用を避ける特別な理由はありませんaddFinalizer
。それは同じ解決策であり、それでもIOafaictの使用を強制します。
解決策3:解決策2よりもクリーンなAPIですが、GHCはまだそれをサポートしていません
ユーザーは、を呼び出すことでマネージャースレッドを開始します。これによりinitBTChanCollector
、これらのダミーTVarのセット(ソリューション2から)が監視され、必要なクリーンアップが実行されます。unsafePerformIO
基本的に、グローバル( ed)を介して何をすべきかを知っている別のスレッドにIOを押し込みますTVar
。基本的にソリューション2と同じように機能しますが、BTChanの作成は引き続きSTMにすることができます。実行に失敗するinitBTChanCollector
と、プロセスの実行中にタスクのリストが増え続けます(スペースリーク)。
BTChan
解決策4: sの破棄を許可しない
これは、問題を無視することに似ています。ユーザーがダッピングをドロップしない場合BTChan
、問題は解消されます。
解決策 5ezyangの答え(完全に有効でありがたい)がわかりましたが、実際には「dup」関数だけで現在のAPIを維持したいと考えています。
**解決策6**より良いオプションがあることを教えてください。
編集:私はソリューション3(完全にテストされていないアルファリリース)を実装し、グローバル自体をaにすることで潜在的なスペースリークを処理しましたBTChan
-そのchanはおそらく1の容量を持つはずなので、実行を忘れるとinit
すぐに表示されますが、それは小さな変更です。これはGHCi(7.0.3)で機能しますが、これは偶発的なもののようです。GHCは、ブロックされた両方のスレッド(BTChanを読み取る有効なスレッドと監視スレッド)に例外をスローするため、別のスレッドが参照を破棄したときにBTChanの読み取りがブロックされた場合、あなたは死にます。