4

私は Haskell スレッドをいじっていますが、遅延評価された値をチャネル全体で通信するという問題に直面しています。たとえば、N 個のワーカー スレッドと 1 個の出力スレッドがある場合、ワーカーは評価されていない作業を通信し、出力スレッドがそれらの作業を実行することになります。

さまざまなドキュメントでこの問題について読み、さまざまな解決策を見てきましたが、機能する解決策は 1 つだけで、残りは機能しません。以下は、ワーカー スレッドが長時間かかる計算を開始するコードです。最初のスレッドが最も長くかかり、後のスレッドが早く終了するように、スレッドを降順で開始します。

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan   -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
  logV <- spawn1 readWriteLoop
  say "START"
  forM_ [18,17..10] $ spawn . busyWork
  await
  writeChan logCh Nothing -- poison the logger
  takeMVar logV
  putStrLn "DONE"
  where
    say mesg = force mesg >>= writeChan logCh . Just

    force s = mapM evaluate s  -- works
--    force s = return $ s `using` rdeepseq  -- no difference
--    force s = return s -- no-op; try this with strict channel

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
    embiggen i = i*i*i*i*i

    readWriteLoop = readChan logCh >>= writeReadLoop
    writeReadLoop Nothing = return ()
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

    spawn1 action = do
      v <- newEmptyMVar
      forkIO $ action `finally` putMVar v ()
      return v

    spawn action = do
      v <- spawn1 action
      modifyMVar statVars $ \vs -> return (v:vs, ())

    await = do
      vs <- modifyMVar statVars $ \vs -> return ([], vs)
      mapM_ takeMVar vs

ほとんどの手法を使用して、結果は生成された順序で報告されます。つまり、実行時間の長い計算が最初になります。これは、出力スレッドがすべての作業を行っていることを意味すると解釈します。

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE

これに対する答えは厳密なチャネルになると思いましたが、うまくいきませんでした。文字列の WHNF は、最も外側のコンストラクター (文字列の最初の文字に対して nil または cons) を強制するだけなので、不十分であることを理解しています。はrdeepseq完全に評価されるはずですが、違いはありません。私が見つけた唯一の方法Control.Exception.evaluate :: a -> IO aは、文字列内のすべての文字をマップすることです。(forceいくつかの異なる代替方法については、コード内の関数のコメントを参照してください。) の結果は次のControl.Exception.evaluateとおりです。

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE

rdeepseqでは、チャネルを厳密に設定したり、この結果を生成したりしないのはなぜですか? 他のテクニックはありますか?最初の結果が壊れている理由を誤解していますか?

4

1 に答える 1

5

ここで起こっている2つの問題があります。

最初の試行(明示的を使用rnf)が機能しない理由は、を使用してreturn、評価時に自分自身を完全に評価するサンクを作成したが、サンク自体は評価されていないためです。評価のタイプは次のとおりであることに注意してください。順序を課すことができるという意味でa -> IO a値を返すという事実:IOevaluate

return (error "foo")   >> return 1 == return 1
evaluate (error "foo") >> return 1 == error "foo"

結果は、このコードは次のとおりです。

force s = evaluate $ s `using` rdeepseq

動作します(のように、と同じ動作をしますmapM_ evaluate s)。


厳密なチャネルを使用する場合は少し注意が必要ですが、これは厳密な同時実行性のバグが原因であると思います。コストのかかる計算は実際にはワーカースレッドで実行されていますが、あまり効果的ではありません(文字列内の非同期例外を非表示にし、例外がどのスレッドに表示されるかを確認することで、これを明示的に確認できます)。

バグは何ですか?strictのコードを見てみましょうwriteChan

writeChan :: NFData a => Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
  new_hole <- newEmptyMVar
  modifyMVar_ write $ \old_hole -> do
    putMVar old_hole $! ChItem val new_hole
    return new_hole

サンクを評価する前に、それmodifyMVar_が呼び出されていることがわかります。writeその場合の操作のシーケンスは次のとおりです。

  1. writeChan入力されます
  2. 私たちtakeMVar write(チャンネルに書き込みたい他の人をブロックする)
  3. 高価なサンクを評価します
  4. 高価なサンクをチャンネルに入れました
  5. putMVar write他のすべてのスレッドのブロックを解除します

evaluateロックが取得される前に評価を実行するため、バリアントではこの動作は見られません。

私はこれについてドンにメールを送り、彼がこの振る舞いが一種の最適ではないことに同意するかどうかを確認します。

ドンは、この動作が最適ではないことに同意します。パッチに取り組んでいます。

于 2011-03-06T17:38:28.087 に答える