0

問題の正しい言葉遣いがよくわからないので、遠慮なく正しい用語を教えてください。

イテレータ (遅延評価) を出力するプロセス A があるとします。これにより、Iterator[A] が生成されます。

次に、Iterator[B] を返すイベントをマップする別のプロセス B があります。

これは、さらにいくつかのプロセスで続きます Iterator[A] -> Iterator[B] -> Iterator[C] -> ---

最終的に、このストリームをリスト [Z] に評価します。これにより、List[A] -> List[B] -> List[C] などのメモリ ヒットを節約できます。

ここで、並列化を導入してパフォーマンスを向上させたいと考えていますが、反復子全体で各要素の評価を並列化するのではなく、各反復子スタックを並列化したいと考えています。したがって、この場合、プロセス A のスレッドは Iterator[A] の Queue[A] を満たし、プロセス B のスレッドは Queue[A] から取得し、マッピングを適用してから、Iterator[B] の Queue[B] に追加します。から読む。

独自の非同期キューを設計することで、以前に他の言語でこれを行ったことがありますが、Scala でこれを解決するにはどうすればよいのか疑問に思っていました。

4

2 に答える 2

1

アクターを使用して作成した最初のスタブ ソリューションを次に示します。完全にブロックされているため、先物を使用した実装が開発される可能性があります

case class AsyncIterator[T](iterator:Iterator[T]) extends Iterator[T] {
  private val queue = new scala.collection.mutable.SynchronizedQueue[Int]()
  private var end = !iterator.hasNext

  def hasNext() = {
    if (end) false
    else if (!queue.isEmpty) true
    else hasNext
  }

  def next() = {
    while (q.isEmpty) {
      if (end) throw new Exception("blah")
    }
    q.dequeue()
  }

  private val producer: Actor = actor {
    loop {
      if (!iterator.hasNext) {
        end = true
        exit
      }
      else {
        q.enqueue(iterator.next)
      }
    }
  }
  producer.start()
}
于 2012-12-11T18:34:31.867 に答える
-3

あなたは代替言語を受け入れるので、Go はどうですか?

最近、あなたが説明したのと同じことをまったく異なる方法で達成するイベント駆動型パイプラインを構築する方法についての議論がありました。

イベント パイプラインは、遅延反復子について考えるよりも、考えて設計する方が間違いなく簡単です。これは、各段階で重要な問題が「この段階で単一のエンティティに対して何を行うか」というデータ フロー システムになるためです。「どうすれば多くのエンティティを効率的に反復できますか?」ではなく

イベント ドリブン パイプラインが実装されると、それをどのように並行または並列にするかという問題は議論の余地がなくなります。既に実行済みです。

于 2012-12-12T13:42:45.467 に答える