26

問題

こんにちは!私はロギング ライブラリを作成しています。別のスレッドで実行されるロガーを作成したいと考えていますが、すべてのアプリケーション スレッドはメッセージを送信するだけです。この問題に対する最もパフォーマンスの高いソリューションを見つけたいと考えています。ここには単純な unbound キューが必要です。

アプローチ

利用可能なソリューションがどのように機能するかを確認するためにいくつかのテストを作成しましたが、ここで非常に奇妙な結果が得られました。以下に基づいて 4 つの実装 (ソース コードを以下に示します) をテストしました。

  1. パイプ同時実行
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. 「Haskell での並列および並行プログラミング」という本で説明されているように、MVar ベースこの手法では、容量 1 の制限付きキューが得られることに注意してください。

テスト

テストに使用したソースコードは次のとおりです。

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

でコンパイルしてghc -O2 Main.hs実行するだけです。テストでは、20 個のメッセージ プロデューサが作成され、それぞれが 1000000 個のメッセージを生成します。

結果

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

質問

pipes-concurrent バージョンのパフォーマンスが非常に遅い理由と、chan ベースのバージョンよりもはるかに遅い理由をお聞きしたいと思います。MVar がすべてのバージョンの中で最速であることに、私は非常に驚いています。なぜこの結果が得られたのか、どのような場合でもより良い結果が得られるのか、誰か詳しく教えていただけますか?

4

2 に答える 2

19

Chanそこで、とTQueue(pipes-concurrencyここでは内部で使用されています)の分析の概要を少し説明しますunagi-chan。それがあなたの質問に答えるかどうかはわかりません。何が起こっているのかをよく理解するために、ベンチマーク中にさまざまなキューをフォークし、バリエーションを試してみることをお勧めします。

ちゃん

Chan次のようになります。

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

これは s の連結リストですMVar。この型の 2 つMVarの はChan、それぞれリストの現在の先頭と末尾へのポインタとして機能します。書き込みは次のようになります。

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

1 でライターは書き込み側でロックを取得し、2 で私たちのアイテムaがリーダーで利用可能になり、3 で書き込み側が他のライターに対してロック解除されます。

読み取りと書き込みが競合しないため、これは実際には、シングル コンシューマー/シングル プロデューサーのシナリオ (こちらのグラフを参照) で非常にうまく機能します。しかし、複数の同時ライターを使用すると、問題が発生する可能性があります。

  • 別のライターが 2 にある間に 1 にヒットしたライターはブロックされ、スケジュールが解除されます (コンテキスト スイッチを測定できた最速は ~150ns (かなり速い) です。おそらくもっと遅い状況があります)。したがって、多くのライターが競合する場合、基本的には、スケジューラーを介して待機キューに大きな往復を行いMVar、最終的に書き込みが完了する可能性があります。

  • ライターが 2 のときに (タイムアウトにより) スケジュールが解除されると、ロックが保持され、再度スケジュールを変更できるようになるまで、書き込みは完了できません。これは、オーバーサブスクライブしている場合、つまりスレッド/コアの比率が高い場合に、より大きな問題になります。

最後に、MVar-per-item を使用すると、割り当てに関していくらかのオーバーヘッドが必要になります。さらに重要なことは、多くの変更可能なオブジェクトを蓄積すると、多くの GC プレッシャーが発生する可能性があることです。

TQueue

TQueueSTMその正しさについての推論が非常に簡単になるため、素晴らしいです。これは機能的なデキュー スタイルのキューでありwrite、ライター スタックを読み取り、要素をコンシングし、書き戻すだけで構成されます。

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

writeTQueue新しいスタックを書き戻した後、別のインターリーブ書き込みが同じことを行うと、書き込みの 1 つが再試行​​されます。より多くwriteTQueueの がインターリーブされると、競合の影響が悪化します。ただし、競合する を無効にできる操作がChan1 つしかなく、トランザクションが非常に小さい (読み取りとのみ) ため、パフォーマンスの低下は よりもはるかに遅くなります。writeTVarwriteTQueue(:)

読み取りは、書き込み側からスタックを「デキュー」し、それを反転し、反転したスタックを独自の変数に格納して簡単に「ポップ」できるようにすることで機能します (全体として、これにより償却された O(1) プッシュとポップが得られます)。

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

リーダーには、ライターに対して対称的な中程度の競合の問題があります。通常、リーダーとライターは競合しませんが、リーダー スタックが使い果たされると、リーダーは他のリーダーやライターと競合します。aTQueueに十分な値をプリロードしてから 4 つのリーダーと 4 つのライターを起動した場合、次の書き込みの前に逆の処理が完了するのに苦労したため、ライブロックを誘発できる可能性があると思われます。とは異なり、多くのリーダーが待機しMVarている への書き込みは、TVarそれらすべてを同時に起動することにも注意してください (シナリオによっては、多かれ少なかれ効率的である可能性があります)。

TQueueあなたのテストでは の弱点があまり見られないのではないかと思います。主に、書き込み競合の適度な影響と、多くの可変オブジェクトの割り当てと GC によるオーバーヘッドが見られます。

うなぎちゃん

unagi-chan最初に競合を適切に処理するように設計されました。概念的には非常に単純ですが、実装にはいくつかの複雑さがあります

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

キューの読み取り側と書き込み側は、Stream渡される値 (ライターからリーダーへ) とブロッキングの表示 (リーダーからライターへ) を調整する を共有し、読み取り側と書き込み側はそれぞれ独立したアトミック カウンターを持ちます。書き込みは次のように機能します。

  1. ライターはincrCounter、書き込みカウンターでアトミックを呼び出して、その (単一の) リーダーと調整するための一意のインデックスを受け取ります

  2. ライターはそのセルを見つけて、次の CAS を実行します。Written a

  3. 成功した場合は終了し、そうでない場合は、リーダーがそれを打ち負かしてブロックしている (またはブロックに進んでいる) ことを確認し、aを実行(\Blocking v)-> putMVar v a)して終了します。

読み取りは、同様の明らかな方法で機能します。

最初のイノベーションは、(CAS/再試行ループや Chan のようなロックのように) 競合が発生しても機能が低下しないアトミック操作にすることです。単純なベンチマークと実験に基づくと、ライブラリによって公開されている fetch-and-add primop がatomic-primops最適に機能します。

次に 2 では、リーダーとライターの両方が 1 回の比較と交換 (リーダーの高速パスは単純な非アトミック読み取り) を実行するだけで、調整を完了できます。

unagi-chanですから、うまくやろうとするために、

  • fetch-and-add を使用して競合点を処理する

  • ロックフリーの手法を使用して、オーバーサブスクライブされたときに、不適切なタイミングでスケジュールを解除されたスレッドが他のスレッドの進行をブロックしないようにします (ブロックされたライターは、カウンターによって "割り当てられた" リーダーのみをブロックする可能性があります。非同期例外に関する注意事項をお読みください)。ドキュメントで、ここではより良いセマンティクスがunagi-chanあることに注意してください)Chan

  • 配列を使用して要素を保存します。これにより、局所性が向上します (ただし、以下を参照)。要素ごとのオーバーヘッドが低くなり、GC への負担が少なくなります。

最後の注意事項。配列の使用: 配列への同時書き込みは、一般にスケーリングには適していません。キャッシュラインがライター スレッド間で前後に無効化されるため、多くのキャッシュ コヒーレンス トラフィックが発生するためです。一般的な用語は「偽の共有」です。しかし、書き込みなどをストライプ化する代替設計には、キャッシュに関する利点と欠点もあります。私はこれについて少し実験してきましたが、現時点では決定的なものはありません。

正当な理由で偽共有に懸念を抱く場所の 1 つはカウンターで、64 バイトに合わせてパディングします。これは実際にベンチマークに表示され、唯一の欠点はメモリ使用量の増加です。

于 2015-01-14T18:00:04.877 に答える
5

パフォーマンスが低下する理由を推測する必要があるとすればpipes-concurrency、それはすべての読み取りと書き込みがSTMトランザクションにラップされているためです。他のライブラリは、より効率的な低レベルの同時実行プリミティブを使用しています。

于 2015-01-14T05:09:40.423 に答える