IO
で生成された 2 つのストリームを結合したい場合は、Gabriel のコメントが解決策です。
そうしないと、どちらのストリームが最初に値を生成するかを待つことができません。コンジットはシングルスレッドで決定論的です。一度に 1 つのパイプのみを処理します。ただし、2 つのストリームをインターリーブして、いつ切り替えるかを決定できるようにする関数を作成できます。
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
Pipe (..), Source, Sink,
injectLeftovers, ConduitM (..),
mapOutput, mapOutputMaybe
)
-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
=> Source m (Maybe a)
-> Source m (Maybe b)
-> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
where
goL :: Monad m => Pipe () () (Maybe a) () m ()
-> Pipe () () (Maybe b) () m ()
-> Pipe () () (Either a b) () m ()
goL (Leftover l ()) r = goL l r
goL (NeedInput _ c) r = goL (c ()) r
goL (PipeM mx) r = PipeM $ liftM (`goL` r) mx
goL (Done _) r = mapOutputMaybe (liftM Right) r
goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
goL (HaveOutput c f Nothing) r = goR c r
-- This is just a mirror copy of goL. We should combine them together to
-- avoid code repetition.
goR :: Monad m => Pipe () () (Maybe a) () m ()
-> Pipe () () (Maybe b) () m ()
-> Pipe () () (Either a b) () m ()
goR l (Leftover r ()) = goR l r
goR l (NeedInput _ c) = goR l (c ())
goR l (PipeM mx) = PipeM $ liftM (goR l) mx
goR l (Done _) = mapOutputMaybe (liftM Left) l
goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
goR l (HaveOutput c f Nothing) = goL l c
が返されるまで 1 つのソースを処理しNothing
、次に別のソースに切り替えます。1 つのソースが終了すると、もう 1 つのソースが最後まで処理されます。
例として、2 つのリストを組み合わせてインターリーブできます。
import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)
main = (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [ 1..10])
(sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
$$ awaitForever (\x -> lift $ print x)
複数のソースが必要な場合は、merge
次のようなものに適応できます
mergeList :: Monad m => [Source m (Maybe a)] -> Source m a
すべてのソースが終了するまで、指定されたソースのリストを循環します。