62

GHC Haskellstm、ネットワーク コンジット、およびコンジットを利用する私のアプリケーションでは、各ソケットにストランドがあり、これは を使用して自動的にフォークされますrunTCPServer。ストランドは、ブロードキャスト TChannel を使用して他のストランドと通信できます。

これは、コンジット「チェーン」をセットアップする方法を示しています。

ここに画像の説明を入力

したがって、ここにあるのは 2 つのソース (それぞれがヘルパー コンジットにバインドされている) で、Packetオブジェクトを生成します。このオブジェクトencoderは、受け入れて に変換ByteStringし、ソケットを送信します。私は、2 つの入力を効率的に (パフォーマンスが懸念事項です) 融合するのに非常に苦労しました。

誰かが私を正しい方向に向けることができれば幸いです。


試みずにこの質問を投稿するのは失礼なので、以前に試したことをここに記載します。

TMChan (クローズ可能なチャネル) からソースを (ブロッキング) 生成する関数を作成/チェリーピックしました。

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

同様に、Chan をシンクに変換する関数。

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

次に、mergeSources は簡単です。2 つのスレッドをフォークして (これは本当にやりたくないのですが、一体何なのか)、それらの新しいアイテムを 1 つのリストに入れることができ、そのリストからソースを生成します。

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

これらの関数をタイプチェックすることには成功しましたが、これらの関数をタイプチェックに使用することには成功しませんでした。

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

とにかく、この方法には欠陥があると思います。多くの中間リストと変換があります。これでは、パフォーマンスが向上しません。ガイダンスを求めています。


PS。私が理解できることから、これはの複製ではありません。コンジットを複数の入力と融合します。私の状況では、両方のソースが同じ型を生成し、どちらのソースからPacketオブジェクトが生成されたかは気にしません。

PPS。サンプル コードでのレンズの使用 (したがって、知識が必要) についてお詫び申し上げます。

4

1 に答える 1

1

それが助けになるかどうかはわかりませんが、私はIainの提案を実装しようとしmergeSources'、チャネルのいずれかが停止するとすぐに停止するバリアントを作成しました:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(この簡単な追加はこちらから入手できます)。

のあなたのバージョンへのいくつかのコメントmergeSources(一粒の塩でそれらを取ってください、それは私が何かをよく理解していない可能性があります):

  • ...TMChan代わりに使うのは...TBMChan危険そうです。ライターがリーダーよりも速い場合、ヒープが吹き飛ばされます。あなたの図を見ると、TCP ピアがデータを十分に速く読み取れない場合、これは簡単に発生するようです。したがって...TBMChan、おそらく大きいが制限された境界で使用することは間違いありません。
  • MonadSTM m制約は必要ありません。すべての STM は次のようにラップさIOれます

    liftSTM = liftIO . atomically
    

    mergeSources'で使用する場合、これは少し役立つかもしれませんserverApp

  • ただの見た目の問題、私は見つけました

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    モナドで を使用liftA2しているため、非常に読みにくい。(->) r私は言うだろう

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    長くなりますが、はるかに読みやすくなります。

で遊ぶことができる自己完結型のプロジェクトを作成できますserverAppか?

于 2013-07-05T19:08:55.897 に答える