12

epoll「 」スタイルのイベント管理を使用して、効率的なシングルスレッドソケット通信を実装したいと思います。

非常に命令型のプログラムを「ゼロから」作成する場合、基本的には次のようにします(入力したばかりの疑似っぽいコードだけで、おそらくコンパイルされません)。

import Control.Concurrent

import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString

import qualified GHC.Event as Event

import Network
import Network.Socket
import Network.Socket.ByteString

main = withSocketFromSomewhere $ \ socket -> do
  let fd = fromIntegral . fdSocket $ socket

  -- Some app logic
  state <- newMVar "Bla"

  -- Event manager
  manager <- Event.new

  -- Do an initial write
  initialWrite socket state manager

  -- Manager does its thing
  Event.loop manager

write manager socket bs =
  -- Should be pretty straight-forward
  Event.registerFd manager theWrite fd Event.evtWrite
  where
    fd = fromIntegral . fdSocket $ socket
    theWrite key _ = do
      Event.unregisterFd manager key
      sendAll socket bs

read manager socket cont =
  -- Ditto
  Event.registerFd manager theRead fd Event.evtRead
  where
    fd = fromIntegral . fdSocket $ socket
    theRead key _ = do
      Event.unregisterFd manager key
      bs <- recv socket 4096
      cont bs

initialWrite socket state manager = do
  msg <- readMVar state
  write manager socket msg
  read manager socket $ \ bs -> do
    ByteString.putStrLn bs
    putMVar state msg

マネージャーにタイムアウトイベントを追加する関数などもあると想像してください。

ただし、いくつかの理由から、このコードは特に優れているわけではありません。

  1. イベントマネージャーを手動で持ち歩きます。
  2. アプリケーションロジックにを使用する必要MVarがあります。これは、不透明なイベントマネージャーに、ある状態を渡す必要があることを伝えることができないためです。これは、スレッドを1つしか使用しないため、モナド変換子スタック。
  3. 読み取り用に明示的に区切られた継続を作成する必要があります(書き込み用にこれを実行する必要がある場合もあります。この状況で何が賢明かわかりません)。

さて、これはモナド変換子などを大量に使用することを叫ぶだけです。私はこれを実行できるようにしたいと思います。

main =
  withSocketFromSomewhere $ \ socket ->
  runEvents . flip runStateT "Bla" $ initialWrite socket

initialWrite socket = do
  msg <- lift get
  write socket msg
  resp <- read socket
  liftIO $ ByteString.putStrLn resp
  lift $ put msg

このコードは、上記のコードと同じ動作をする必要があります。たとえば、読み取りが回線で受信されるまで計算を一時停止resp <- read socketし、同じスレッド/マネージャーで複数のソケットを管理できるようにします。

質問:

  1. ユーザーにもう少しパワーを与えるGHCイベントAPI/libevent /相当へのより高レベルのインターフェースはありますか?最近のGHC(私は7.4.1を使用しています)で発生した同期IOスケジューリングの変更を考慮することは価値がありますか?
  2. たとえば、ソケットからの読み取りを常に処理する1つの関数を持ち、この関数が書き込み「スレッド」と同じ状態モナドを共有するなど、協調的な同時実行性を実装したい場合はどうなりますか?これは、(1)のどのソリューションでも実行できますか?
4

1 に答える 1

22

イベントとスレッドを統合するための言語ベースのアプローチを読むことを強くお勧めします。選択したIOサブシステムの上に必要な同時実行システムを構築する方法について説明し、彼らの論文では実際にそれを上に実装していepollます。

残念ながら、この論文のデータ型とコード例は非常に貧弱で、コードをリバースエンジニアリングするのに(少なくとも私にとっては)時間がかかり、論文にもいくつかの間違いがあります。ただし、実際には、それらのアプローチは、「フリーモナド」として知られる非常に強力で一般的なアプローチのサブセットです。

たとえば、それらのTraceデータ型は、変装した単なる無料のモナドです。その理由を知るために、FreeモナドのHaskellの定義を調べてみましょう。

data Free f r = Pure r | Free (f (Free f r))

フリーモナドは「ファンクターのリスト」のようなものでPure、リストのNilコンストラクターに類似しており、「リスト」に追加のファンクターを追加するためFree、リストのコンストラクターに類似してい ます。Cons技術的には、私が衒学者だった場合、無料のモナドを上記のリストのようなデータ型として実装する必要があると言うことは何もありませんが、実装するものはすべて上記のデータ型と同型である必要があります。

無料のモナドの良いところは、ファンクターが与えられると、f自動的Free fにモナドになることです。

instance (Functor f) => Monad (Free f) where
    return = Pure
    Pure r >>= f = f r
    Free x >>= f = Free (fmap (>>= f) x)

つまり、Traceデータ型を2つの部分に分解できます。つまり、ベースファンクターfと、次のように生成されたフリーモナドfです。

-- The base functor
data TraceF x =
    SYS_NBIO (IO x)
  | SYS_FORK x x
  | SYS_YIELD x
  | SYS_RET
  | SYS_EPOLL_WAIT FD EPOLL_EVENT x

-- You can even skip this definition if you use the GHC
-- "DerivingFunctor" extension
instance Functor TraceF where
    fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x)
    fmap f (SYS_FORK x) = SYS_FORK (f x) (f x)
    fmap f (SYS_YIELD x) = SYS_YIELD (f x)
    fmap f SYS_RET = SYS_RET
    fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x)

そのファンクターが与えられると、Traceモナドは「無料」で入手できます。

type Trace a = Free TraceF a
-- or: type Trace = Free TraceF

...それが「無料」モナドと呼ばれる理由ではありませんが。

そうすれば、すべての機能を定義するのが簡単になります。

liftF = Free . fmap Pure
-- if "Free f" is like a list of "f", then
-- this is sort of like: "liftF x = [x]"
-- it's just a convenience function

-- their definitions are written in continuation-passing style,
-- presumably for efficiency, but they are equivalent to these
sys_nbio io = liftF (SYS_NBIO io)
sys_fork t = SYS_FORK t (return ()) -- intentionally didn't use liftF
sys_yield = liftF (SYS_YIELD ())
sys_ret = liftF SYS_RET
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event ())

したがって、これらのコマンドをモナドのように使用できます。

myTrace fd event = do
    sys_nbio (putStrLn "Hello, world")
    fork $ do
        sys_nbio (putStrLn "Hey")
    sys_expoll_wait fd event

さて、ここに重要な概念があります。私が書いたばかりのモナドは、データ型を作成するだけです。それでおしまい。それはまったく解釈しません。これは、式の抽象構文木を作成する方法と同じです。どのように評価するかは完全にあなた次第です。論文では、表現の通訳の具体例を示していますが、自分で書くのは簡単です。

重要な概念は、このインタープリターは任意のモナドで実行できるということです。したがって、並行性を介していくつかの状態をスレッド化する場合は、それを行うことができます。たとえば、StateT IOモナドを使用して、IOアクションが呼び出された回数を追跡するおもちゃのインタープリターを次に示します。

interpret t = case t of
    SYS_NBIO io -> do
        modify (+1)
        t' <- lift io
        interpret t'
    ...

forkIOのアクション間でモナドをスレッド化することもできます!これは私の非常に古いコードです。これは、私があまり経験がなく、無料のモナドが何であるかわからなかったときに書き戻されたため、バグがあり、不完全ですが、実際にこれを示しています。

module Thread (Thread(..), done, lift, branch, fork, run) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Cont
import Data.Sequence
import qualified Data.Foldable as F

data Thread f m =
    Done
  | Lift (m (Thread f m))
  | LiftIO (IO (Thread f m))
  | Branch (f (Thread f m))
  | Exit

done = cont $ \c -> Done
lift' x = cont $ \c -> Lift $ liftM c x
liftIO' x = cont $ \c -> LiftIO $ liftM c x
branch x = cont $ \c -> Branch $ fmap c x
exit = cont $ \c -> Exit

fork x = join $ branch [return (), x >> done]

run x = do
    q <- liftIO $ newTChanIO
    enqueue q $ runCont x $ \_ -> Done
    loop q
  where
    loop q = do
        t <- liftIO $ atomically $ readTChan q
        case t of
            Exit -> return ()
            Done -> loop q
            Branch ft -> mapM_ (enqueue q) ft >> loop q
            Lift mt -> (mt >>= enqueue q) >> loop q
            LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q
    enqueue q = liftIO . atomically . writeTChan q

無料のモナドの背後にあるポイントは、それらがモナドインスタンスを提供し、他には何も提供しないということです。言い換えれば、それらは後退し、あなたがそれらをどのように解釈したいかを完全に自由にします。それがそれらが非常に有用である理由です。

于 2012-04-28T17:07:00.077 に答える