ウィキペディアのせいではありません。これは、Async.Parallel
内部でどのように機能するかの結果です。の型シグネチャAsync.Parallel
はseq<Async<'T>> -> Async<'T[]>
. シーケンスからのすべての結果を含む単一の Async 値を返します。したがって、すべての計算が返されるまで戻りませんseq<Async<'T>>
。
例として、未処理のリクエスト、つまり、サーバーに送信されたがまだ応答を受信/解析していないリクエストの数を追跡するようにコードを変更しました。
open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.WebExtensions
open System
open System.Net
open System.Threading
type WebClientWithTimeout() =
inherit WebClient()
let mutable timeout = -1
member __.Timeout
with get () = timeout
and set value = timeout <- value
override x.GetWebRequest uri =
let r = base.GetWebRequest(uri)
r.Timeout <- x.Timeout
r
type ParsedDoc = ParsedDoc
type ParsedArticle = ParsedArticle
let parseDoc (str : string) = ParsedDoc
let parseArticle (doc : ParsedDoc) = Some ParsedArticle
/// A synchronized wrapper around Console.Out so we don't
/// get garbled console output.
let synchedOut =
System.Console.Out
|> System.IO.TextWriter.Synchronized
let parseWikiAsync(url : string, outstandingRequestCount : int ref) =
async {
use wc = new WebClientWithTimeout(Timeout = 5000)
wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")
// Increment the outstanding request count just before we send the request.
do
// NOTE : The message must be created THEN passed to synchedOut.WriteLine --
// piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
// to be created which somehow defeats the synchronization and garbles the output.
let msg =
Interlocked.Increment outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
let! html = wc.AsyncDownloadString(Uri(url))
let ret =
try html |> parseDoc |> parseArticle
with ex ->
let msg = sprintf "%A" ex
synchedOut.WriteLine msg
None
// Decrement the outstanding request count now that we've
// received a reponse and parsed it.
do
let msg =
Interlocked.Decrement outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
return ret
}
/// Writes a message to the console, passing a value through
/// so it can be used within a function pipeline.
let inline passThruWithMessage (msg : string) value =
Console.WriteLine msg
value
let en100 =
let outstandingRequestCount = ref 0
seq { for _ in 1..120 ->
parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) }
|> Async.Parallel
|> Async.RunSynchronously
|> passThruWithMessage "Finished running all of the requests."
|> Seq.choose id
|> Seq.take 100
そのコードをコンパイルして実行すると、次のような出力が表示されます。
Outstanding requests: 4
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 3
Outstanding requests: 5
Outstanding requests: 6
Outstanding requests: 7
Outstanding requests: 8
Outstanding requests: 9
Outstanding requests: 10
Outstanding requests: 12
Outstanding requests: 14
Outstanding requests: 15
Outstanding requests: 16
Outstanding requests: 17
Outstanding requests: 18
Outstanding requests: 13
Outstanding requests: 19
Outstanding requests: 20
Outstanding requests: 24
Outstanding requests: 22
Outstanding requests: 26
Outstanding requests: 27
Outstanding requests: 28
Outstanding requests: 29
Outstanding requests: 30
Outstanding requests: 25
Outstanding requests: 21
Outstanding requests: 23
Outstanding requests: 11
Outstanding requests: 29
Outstanding requests: 28
Outstanding requests: 27
Outstanding requests: 26
Outstanding requests: 25
Outstanding requests: 24
Outstanding requests: 23
Outstanding requests: 22
Outstanding requests: 21
Outstanding requests: 20
Outstanding requests: 19
Outstanding requests: 18
Outstanding requests: 17
Outstanding requests: 16
Outstanding requests: 15
Outstanding requests: 14
Outstanding requests: 13
Outstanding requests: 12
Outstanding requests: 11
Outstanding requests: 10
Outstanding requests: 9
Outstanding requests: 8
Outstanding requests: 7
Outstanding requests: 6
Outstanding requests: 5
Outstanding requests: 4
Outstanding requests: 3
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 0
Finished running all of the requests.
ご覧のとおり、すべてのリクエストは、いずれかが解析される前に行われます。したがって、接続が遅い場合、または大量のドキュメントを取得しようとしている場合、サーバーは接続を切断している可能性があります。送信しようとしている応答を取得していないと想定する場合があります。コードのもう 1 つの問題は、 で生成する要素の数を明示的に指定する必要があることですseq
。これにより、コードの再利用性が低下します。
より良い解決策は、消費するコードで必要とされるページを取得して解析することです。(そして、考えてみれば、まさにそれが F# のseq
良いところです。) まず、Uri を受け取って生成する関数を作成します。seq<Async<'T>>
つまり、値の無限のシーケンスを生成しAsync<'T>
、それぞれが取得する値を生成します。 Uri からのコンテンツを解析し、結果を返します。
/// Given a Uri, creates an infinite sequence of whose elements are retrieved
/// from the Uri.
let createDocumentSeq (uri : System.Uri) =
#if DEBUG
let outstandingRequestCount = ref 0
#endif
Seq.initInfinite <| fun _ ->
async {
use wc = new WebClientWithTimeout(Timeout = 5000)
wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")
#if DEBUG
// Increment the outstanding request count just before we send the request.
do
// NOTE : The message must be created THEN passed to synchedOut.WriteLine --
// piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
// to be created which somehow defeats the synchronization and garbles the output.
let msg =
Interlocked.Increment outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
#endif
let! html = wc.AsyncDownloadString uri
let ret =
try Some html
with ex ->
let msg = sprintf "%A" ex
synchedOut.WriteLine msg
None
#if DEBUG
// Decrement the outstanding request count now that we've
// received a reponse and parsed it.
do
let msg =
Interlocked.Decrement outstandingRequestCount
|> sprintf "Outstanding requests: %i"
synchedOut.WriteLine msg
#endif
return ret
}
この関数を使用して、ページをストリームとして取得します。
//
let en100_Streaming =
#if DEBUG
let documentCount = ref 0
#endif
Uri ("http://en.wikipedia.org/wiki/Special:Random")
|> createDocumentSeq
|> Seq.choose (fun asyncDoc ->
Async.RunSynchronously asyncDoc
|> Option.bind (parseDoc >> parseArticle))
#if DEBUG
|> Seq.map (fun x ->
let msg =
Interlocked.Increment documentCount
|> sprintf "Parsed documents: %i"
synchedOut.WriteLine msg
x)
#endif
|> Seq.take 50
// None of the computations actually take place until
// this point, because Seq.toArray forces evaluation of the sequence.
|> Seq.toArray
このコードを実行すると、サーバーから一度に 1 つずつ結果が取得され、未処理のリクエストが残っていないことがわかります。また、取得する結果の数を変更するのは非常に簡単です。必要なのは、 に渡す値を変更することだけですSeq.take
。
このストリーミング コードは問題なく動作しますが、リクエストを並行して実行しないため、多数のドキュメントの処理が遅くなる可能性があります。これは簡単に修正できる問題ですが、解決策は少し直感的ではないかもしれません。リクエストのシーケンス全体を並行して実行しようとする代わりに (これは元のコードの問題です)、 を使用してリクエストの小さなバッチを並行しAsync.Parallel
て実行し、 を使用して結果を結合してフラットなシーケンスに戻す関数を作成しましょう。.Seq.collect
/// Given a sequence of Async<'T>, creates a new sequence whose elements
/// are computed in batches of a specified size.
let parallelBatch batchSize (sequence : seq<Async<'T>>) =
sequence
|> Seq.windowed batchSize
|> Seq.collect (fun batch ->
batch
|> Async.Parallel
|> Async.RunSynchronously)
この関数を利用するには、ストリーミング バージョンのコードにいくつかの小さな調整が必要です。
let en100_Batched =
let batchSize = 10
#if DEBUG
let documentCount = ref 0
#endif
Uri ("http://en.wikipedia.org/wiki/Special:Random")
|> createDocumentSeq
// Execute batches in parallel
|> parallelBatch batchSize
|> Seq.choose (Option.bind (parseDoc >> parseArticle))
#if DEBUG
|> Seq.map (fun x ->
let msg =
Interlocked.Increment documentCount
|> sprintf "Parsed documents: %i"
synchedOut.WriteLine msg
x)
#endif
|> Seq.take 50
// None of the computations actually take place until
// this point, because Seq.toArray forces evaluation of the sequence.
|> Seq.toArray
繰り返しますが、取得するドキュメントの数は簡単に変更でき、バッチ サイズも簡単に変更できます (繰り返しますが、適度に小さくすることをお勧めします)。必要に応じて、実行時にそれらを切り替えることができるように、「ストリーミング」と「バッチ処理」のコードにいくつかの調整を加えることができます。
最後にもう 1 つ - 私のコードではリクエストがタイムアウトしてはならないので、おそらくWebClientWithTimeout
クラスを削除してWebClient
直接使用することができます。