11

ストリーム(遅延リスト)を使用して、多数のソース(簡単にするために2つなど)からオンデマンドでプルすることができます。反復は、単一のソースからのデータを処理するために使用できます。

複数の入力ソースを処理するためのIterateeのような機能概念はありますか?状態がどのソースからプルしたいのかを示すIterateeを想像することができます。

4

4 に答える 4

15

パイプを使用してこれを行うには、相互作用するプロデューサーごとに1回、パイプモナド変換子をそれ自体の中にネストします。例えば:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

複数の変数のHaskellカリー化関数のように、compositionとrunPipeを使用して各ソースに部分的に適用します。

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

上記の関数は、実行時に出力します。

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

このトリックは、上流または下流の任意の数のパイプに降伏または待機する場合に機能します。また、プロキシ、パイプへの双方向アナログでも機能します。

編集:これは、だけでなく、すべてのiterateeライブラリでも機能することに注意してくださいpipes。実際、ジョン・ミリキンとオレグはこのアプローチの最初の支持者であり、私は彼らからアイデアを盗んだだけです。

于 2012-09-19T15:12:01.043 に答える
6

私たちはScalaのMachinesを使用して、2つだけでなく、任意の量のソースをプルしています。

バイナリ結合の2つの例は、ライブラリ自体によって、Teeモジュール上で提供されます:mergeOuterJoinおよびhashJoin。のコードはhashJoin次のようになります(両方のストリームがソートされていることを前提としています)。

/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

このコードは、メソッドを使用Planしてに「コンパイル」されたを構築します。ここで構築されているタイプは、2つの入力を持つマシンです。とで左右の入力を要求し、で出力します。MachinerepeatedlyTee[A, B, (A, B)]awaits(left)awaits(right)emit

MachinesのHaskellバージョンもあります。

于 2012-09-19T17:55:58.387 に答える
3

コンジット(およびパイプ用に構築できますが、そのコードはまだリリースされていません)には、zip2つのアップストリームを取り、それらをタプルのストリームとして結合するプリミティブがあります。

于 2012-09-19T14:52:38.130 に答える
1

パイプライブラリをチェックしてください。垂直連結が必要なことを実行する可能性があります。例えば、

import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void

source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"

sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x

pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)

シーケンス演算子(>>)は、ソースを垂直方向に連結し、出力を生成しますrunPipe

's'
'a'
'y'
'w'
'h'
'a'
't'
于 2012-09-19T14:41:11.627 に答える