41

序章

Scala のFuture( 2.10で新しく、現在は 2.9.3 ) はアプリカティブ ファンクターです。これは、トラバース可能な type がある場合、と 関数Fを取り、それらを に変換できることを意味します。F[A]A => Future[B]Future[F[B]]

この操作は、標準ライブラリで として利用できますFuture.traverseScalaz 7は、ライブラリからtraverseapplicative functor インスタンスをインポートする場合に使用できる、より一般的なものも提供します。Futurescalaz-contrib

traverseストリームの場合、これら 2 つのメソッドの動作は異なります。標準ライブラリのトラバーサルは、返す前にストリームを消費しますが、Scalaz のトラバーサルはすぐにフューチャーを返します:

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

Leif Warnerがここで観察しているように、別の違いもあります。標準ライブラリtraverseはすべての非同期操作をすぐに開始しますが、Scalaz は最初の操作を開始し、それが完了するのを待ち、2 番目を開始し、それを待つというように続きます。

ストリームの異なる動作

ストリームの最初の値に対して数秒間スリープする関数を作成することで、この 2 番目の違いを示すのは非常に簡単です。

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

Future.traverse(Stream(1, 2))(toFuture)次のように出力されます。

Starting 1!
Starting 2!
Done 2!
Done 1!

そして Scalaz バージョン ( Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

これはおそらく、私たちがここで望んでいるものではありません。

リストの場合は?

不思議なことに、この 2 つのトラバーサルは、リスト上でこの点で同じように動作します — Scalaz は、1 つの Future が完了するのを待たずに、次の Future を開始します。

もう一つの未来

Scalaz には、独自の Futureconcurrent実装を備えた独自のパッケージも含まれています。上記と同じ種類のセットアップを使用できます。

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

そして、ストリームと同様にリストのストリームでの Scalaz の動作を取得します。

Starting 1!
Done 1!
Starting 2!
Done 2!

それほど驚くことではないかもしれませんが、無限ストリームをトラバースしてもすぐに戻ります。

質問

この時点で、要約する表が本当に必要ですが、リストで行う必要があります。

  • 標準ライブラリ トラバーサルを使用したスト​​リーム: 戻る前に消費します。それぞれの未来を待たないでください。
  • Scalaz traversal を使用したスト​​リーム: すぐに戻ります。それぞれの未来が完了するのを待ちます。
  • ストリームを持つ Scalaz の先物: すぐに戻る; それぞれの未来が完了するのを待ちます。

と:

  • 標準ライブラリ トラバーサルを含むリスト: 待つ必要はありません。
  • Scalaz traversal を使用したリスト: 待つ必要はありません。
  • リスト付きの Scalaz フューチャー: 各フューチャーが完了するまで待ちます。

これは意味がありますか?リストとストリームに対するこの操作の「正しい」動作はありますか? 「最も非同期的な」動作、つまり、戻る前にコレクションを消費せず、各 Future が完了するのを待ってから次の Future に進むことをここで表現しない理由はありますか?

4

2 に答える 2

1

すべてにお答えすることはできませんが、いくつか試してみます。

「最も非同期的な」動作、つまり、戻る前にコレクションを消費せず、各 Future が完了するのを待ってから次の Future に進むことをここで表現しない理由はありますか?

依存する計算があり、スレッド数が限られている場合、デッドロックが発生する可能性があります。たとえば、3 つ目のフューチャー (フューチャーのリストにある 3 つすべて) に依存する 2 つのフューチャーがあり、2 つのスレッドしかない場合、最初の 2 つのフューチャーが 2 つのスレッドすべてをブロックし、3 つ目のフューチャーが実行されないという状況が発生する可能性があります。(もちろん、プール サイズが 1 の場合、つまり、計算を次々に実行する場合、同様の状況が発生する可能性があります)

これを解決するには、Future ごとに 1 つのスレッドが必要です。制限はありません。これは先物の小さなリストでは機能しますが、大きなものでは機能しません。したがって、すべてを並行して実行すると、すべてのケースで小さなサンプルが実行され、大きなサンプルはデッドロックする状況が発生します。(例: 開発者のテストは正常に実行されますが、本番環境ではデッドロックが発生します)。

リストとストリームに対するこの操作の「正しい」動作はありますか?

先物では無理だと思います。依存関係についてもう少し知っている場合、または計算がブロックされないことが確実にわかっている場合は、より多くの並行ソリューションが可能になる可能性があります。しかし、先物のリストを実行すると、「設計によって壊れた」ように見えます。最良の解決策は、デッドロックの小さな例 (つまり、Future を次々に実行する) ではすでに失敗する 1 つのようです。

リスト付きの Scalaz フューチャー: 各フューチャーが完了するまで待ちます。

scalaz は、トラバーサルのために内部的に内包表記を使用していると思います。内包表記では、計算が独立しているとは限りません。したがって、Scalaz はここで for 内包表記を使って正しいことを行っていると思います: ある計算を次々と行う。フューチャーの場合、オペレーティング システムに無制限のスレッドがある場合、これは常に機能します。

つまり、言い換えると、内包表記 (必須) がどのように機能するかの単なる成果物が表示されます。

これが意味をなすことを願っています。

于 2013-09-20T08:07:18.190 に答える
1

質問を正しく理解すれば、それは本当にストリームとリストのセマンティクスに帰着すると思います。

リストをトラバースすると、ドキュメントから期待されることが行われます。

提供された関数を使用しTraversableOnce[A]て aを a に変換します。これは、平行マップを実行する場合に便利です。たとえば、関数をリストのすべての項目に並行して適用するには、次のようにします。Future[TraversableOnce[B]]A => Future[B]

ストリームでは、コンパイラが持っているよりも多くのストリームの知識に依存するため、どのように機能させたいかを決定するのは開発者の責任です (ストリームは無限になる可能性がありますが、型システムはそれについて知りません)。ストリームがファイルから行を読み取っている場合、先物を行ごとにチェーンしても実際には並列化されないため、最初にそれを消費したいと考えています。この場合、並列アプローチが必要です。

一方、私のストリームが連続した整数を生成し、大きな数よりも大きい最初の素数を探す無限リストである場合、1 回のスイープで最初にストリームを消費することは不可能です (連鎖Futureアプローチが必要になり、 d はおそらくストリームからバッチを実行したい)。

これを処理する標準的な方法を見つけようとするのではなく、さまざまなケースをより明確にするのに役立つ欠落している型があるかどうか疑問に思います。

于 2016-07-13T08:28:07.477 に答える