ストリーム(遅延リスト)を使用して、多数のソース(簡単にするために2つなど)からオンデマンドでプルすることができます。反復は、単一のソースからのデータを処理するために使用できます。
複数の入力ソースを処理するためのIterateeのような機能概念はありますか?状態がどのソースからプルしたいのかを示すIterateeを想像することができます。
ストリーム(遅延リスト)を使用して、多数のソース(簡単にするために2つなど)からオンデマンドでプルすることができます。反復は、単一のソースからのデータを処理するために使用できます。
複数の入力ソースを処理するためのIterateeのような機能概念はありますか?状態がどのソースからプルしたいのかを示すIterateeを想像することができます。
パイプを使用してこれを行うには、相互作用するプロデューサーごとに1回、パイプモナド変換子をそれ自体の中にネストします。例えば:
import Control.Monad
import Control.Monad.Trans
import Control.Pipe
producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]
consumes2 :: (Show a, Show b) =>
Consumer a (Consumer b IO) r
consumes2 = forever $ do
a <- await -- await from outer producer
b <- lift await -- await from inner producer
lift $ lift $ print (a, b)
複数の変数のHaskellカリー化関数のように、compositionとrunPipeを使用して各ソースに部分的に適用します。
consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA
fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB
上記の関数は、実行時に出力します。
>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)
このトリックは、上流または下流の任意の数のパイプに降伏または待機する場合に機能します。また、プロキシ、パイプへの双方向アナログでも機能します。
編集:これは、だけでなく、すべてのiterateeライブラリでも機能することに注意してくださいpipes
。実際、ジョン・ミリキンとオレグはこのアプローチの最初の支持者であり、私は彼らからアイデアを盗んだだけです。
私たちはScalaのMachinesを使用して、2つだけでなく、任意の量のソースをプルしています。
バイナリ結合の2つの例は、ライブラリ自体によって、Tee
モジュール上で提供されます:mergeOuterJoin
およびhashJoin
。のコードはhashJoin
次のようになります(両方のストリームがソートされていることを前提としています)。
/**
* A natural hash join according to keys of type `K`.
*/
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
a <- awaits(left[A])
mp <- build(m + (f(a) -> a))
} yield mp) orElse Return(m)
for {
m <- build(Map())
r <- (awaits(right[B]) flatMap (b => {
val k = g(b)
if (m contains k) emit(m(k) -> b) else Return(())
})) repeatedly
} yield r
}
このコードは、メソッドを使用Plan
してに「コンパイル」されたを構築します。ここで構築されているタイプは、2つの入力を持つマシンです。とで左右の入力を要求し、で出力します。Machine
repeatedly
Tee[A, B, (A, B)]
awaits(left)
awaits(right)
emit
MachinesのHaskellバージョンもあります。
コンジット(およびパイプ用に構築できますが、そのコードはまだリリースされていません)には、zip
2つのアップストリームを取り、それらをタプルのストリームとして結合するプリミティブがあります。
パイプライブラリをチェックしてください。垂直連結が必要なことを実行する可能性があります。例えば、
import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void
source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"
sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x
pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)
シーケンス演算子(>>)
は、ソースを垂直方向に連結し、出力を生成しますrunPipe
(
's'
'a'
'y'
'w'
'h'
'a'
't'