9

複数の入力ストリームを消費できるコンジットを作成しようとしています。入力ストリームのいずれかを特定の順序で (たとえば、交互に) 待機できるようにして、zip を役に立たなくする必要があります。ここでは、並列処理や非決定論的な処理は行われません。いずれかのストリームで待機します。次のようなコードを記述できるようにしたいと考えています (whereawaitAawaitBawait はそれぞれ 1 番目または 2 番目の入力ストリームで):

do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

私が持っている最善の解決策は、内側のモナドを別のコンジットにすることです。

foo :: Sink i1 (ConduitM i2 o m) ()

次に許可するもの

awaitA = await
awaitB = lift await

そして、これはほとんど機能します。残念ながら、これにより、外側の導管が完全に接続される前に、内側の導管に融合することが非常に困難になるようです。私が最初に試したのは:

fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

しかし、少なくとも複数回実行され、毎回効果的に再起動されるxため、ステートフルな場合、これは機能しません。(x =$=)x

コンジットの内部に侵入する以外に、fuseInner を記述する方法はありますか (これはかなり面倒なようです)。複数の入力ストリームを処理するためのより良い方法はありますか? コンジットが設計された目的をはるかに超えているのでしょうか?

ありがとう!

4

2 に答える 2

3

これは、コンジットの内部に飛び込むことで実行できます。非常に面倒に見えたので、これは避けたかったのです。ここでの回答に基づいて、それを回避する方法はないように思えます (しかし、よりクリーンなソリューションをいただければ幸いです)。

主な難点は、それ(x =$=)が純粋な関数であることですがtransPipe、正しい答えを出すには、一種のステートフルで関数のようなものが必要です。

data StatefulMorph m n = StatefulMorph
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a)
    , finalizeStatefulMorph :: n () }

ステッピングStatefulMorph m nは値 in を受け取り、その値と次の値を変換するために使用する必要があるnext の両方をmin に返します。最後のものはファイナライズする必要があります (「ステートフル」の場合は、コンジットをファイナライズします。nStatefulMorphmStatefulMorph(x =$=)x

StatefulMorphコンジット フュージョンはのコードを使用してとして実装できますpipeL。署名は次のとおりです。

fuseStateful :: Monad m
             => Conduit a m b
             -> StatefulMorph (ConduitM b c m) (ConduitM a c m)

関数の代わりに値を使用するtransPipe(の特殊なケース)の置換も必要です。hoistStatefulMorph

class StatefulHoist t where
    statefulHoist :: (Monad m, Monad n)
                  => StatefulMorph m n
                  -> t m r -> t n r

forのStatefulHoistインスタンスは、 forのコードを少し変更してConduitM i o記述できます。transPipe

fuseInnerそれなら簡単に実装できます。

fuseInner :: Monad m
          => Conduit a m b
          -> ConduitM i o (ConduitM b c m) r
          -> ConduitM i o (ConduitM a c m) r
fuseInner left = statefulHoist (fuseStateful left)

より詳細な説明をここに書き、完全なコードをここに投稿しました。誰かがよりクリーンなソリューション、またはコンジット パブリック API を使用するソリューションを思いつくことができる場合は、投稿してください。

すべての提案と入力に感謝します!

于 2013-03-25T01:33:18.980 に答える
3

IOで生成された 2 つのストリームを結合したい場合は、Gabriel のコメントが解決策です。

そうしないと、どちらのストリームが最初に値を生成するかを待つことができません。コンジットはシングルスレッドで決定論的です。一度に 1 つのパイプのみを処理します。ただし、2 つのストリームをインターリーブして、いつ切り替えるかを決定できるようにする関数を作成できます。

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
    Pipe (..), Source, Sink,
    injectLeftovers, ConduitM (..),
    mapOutput, mapOutputMaybe
  )

-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
      => Source m (Maybe a)
      -> Source m (Maybe b)
      -> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
  where
    goL :: Monad m => Pipe () () (Maybe a) () m () 
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goL (Leftover l ()) r           = goL l r
    goL (NeedInput _ c) r           = goL (c ()) r
    goL (PipeM mx) r                = PipeM $ liftM (`goL` r) mx
    goL (Done _) r                  = mapOutputMaybe (liftM Right) r
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
    goL (HaveOutput c f Nothing) r  = goR c r
    -- This is just a mirror copy of goL. We should combine them together to
    -- avoid code repetition.
    goR :: Monad m => Pipe () () (Maybe a) () m ()
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goR l (Leftover r ())           = goR l r
    goR l (NeedInput _ c)           = goR l (c ())
    goR l (PipeM mx)                = PipeM $ liftM (goR l) mx
    goR l (Done _)                  = mapOutputMaybe (liftM Left) l
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
    goR l (HaveOutput c f Nothing)  = goL l c

が返されるまで 1 つのソースを処理しNothing、次に別のソースに切り替えます。1 つのソースが終了すると、もう 1 つのソースが最後まで処理されます。

例として、2 つのリストを組み合わせてインターリーブできます。

import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)

main =  (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [  1..10])
               (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
         $$ awaitForever (\x -> lift $ print x)

複数のソースが必要な場合は、merge次のようなものに適応できます

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a

すべてのソースが終了するまで、指定されたソースのリストを循環します。

于 2013-03-24T09:45:09.533 に答える