GHC Haskell
stm、ネットワーク コンジット、およびコンジットを利用する私のアプリケーションでは、各ソケットにストランドがあり、これは を使用して自動的にフォークされますrunTCPServer
。ストランドは、ブロードキャスト TChannel を使用して他のストランドと通信できます。
これは、コンジット「チェーン」をセットアップする方法を示しています。
したがって、ここにあるのは 2 つのソース (それぞれがヘルパー コンジットにバインドされている) で、Packet
オブジェクトを生成します。このオブジェクトencoder
は、受け入れて に変換ByteString
し、ソケットを送信します。私は、2 つの入力を効率的に (パフォーマンスが懸念事項です) 融合するのに非常に苦労しました。
誰かが私を正しい方向に向けることができれば幸いです。
試みずにこの質問を投稿するのは失礼なので、以前に試したことをここに記載します。
TMChan (クローズ可能なチャネル) からソースを (ブロッキング) 生成する関数を作成/チェリーピックしました。
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
同様に、Chan をシンクに変換する関数。
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
次に、mergeSources は簡単です。2 つのスレッドをフォークして (これは本当にやりたくないのですが、一体何なのか)、それらの新しいアイテムを 1 つのリストに入れることができ、そのリストからソースを生成します。
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
これらの関数をタイプチェックすることには成功しましたが、これらの関数をタイプチェックに使用することには成功しませんでした。
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
とにかく、この方法には欠陥があると思います。多くの中間リストと変換があります。これでは、パフォーマンスが向上しません。ガイダンスを求めています。
PS。私が理解できることから、これはの複製ではありません。コンジットを複数の入力と融合します。私の状況では、両方のソースが同じ型を生成し、どちらのソースからPacket
オブジェクトが生成されたかは気にしません。
PPS。サンプル コードでのレンズの使用 (したがって、知識が必要) についてお詫び申し上げます。