5

ウィキペディアから 100 個のランダムな記事をダウンロードする (研究用) かなり単純な F# 非同期コードがあります。

何らかの理由で、ダウンロード中の任意の時点でコードがハングします。50過ぎの時もあれば、80過ぎの時もある。

非同期コード自体はかなり単純です。

let parseWikiAsync(url:string, count:int ref) =
    async {
            use wc = new WebClientWithTimeout(Timeout = 5000)
            let! html = wc.AsyncDownloadString(Uri(url))
            let ret =
                try html |> parseDoc |> parseArticle
                with | ex -> printfn "%A" ex; None
            lock count (fun () ->
                if !count % 10 = 0 then
                    printfn "%d" !count
                count := !count + 1
            )
            return ret
    }

fsi では何が問題なのかわからなかったのでSystem.Net.WebClient、タイムアウトを指定できるラッパーであるWebClientWithTimeout を作成しました。

type WebClientWithTimeout() =
    inherit WebClient()
    member val Timeout = 60000 with get, set

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

次に、非同期コンビネーターを使用して 100 を超えるページを取得し、返される parseWikiAsync 呼び出しを返すすべての記事None(そのほとんどは「曖昧さ回避ページ」) を、正確に 100 の記事になるまで除外します。

let en100 =
    let count = ref 0
    seq { for _ in 1..110 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", count) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Seq.choose id
    |> Seq.take 100

コードをコンパイルしてデバッガーで実行すると、スレッドは 3 つしかなく、そのうち実際のコードを実行しているのは Async パイプラインだけです。他の 2 つの場所は「使用不可」で、コール スタックには何もありません。

AsyncDownloadStringこれは、parseWikiAsync のどこにもスタックしていないことを意味すると思います。他に何が原因でしょうか?

また、最初は非同期コードが実際に開始されるまでに約 1 分かかります。その後、再び無期限にハングアップするまで、かなり妥当なペースで進みます。

メイン スレッドのコール スタックは次のとおりです。

>   mscorlib.dll!System.Threading.WaitHandle.InternalWaitOne(System.Runtime.InteropServices.SafeHandle waitableSafeHandle, long millisecondsTimeout, bool hasThreadAffinity, bool exitContext) + 0x22 bytes 
    mscorlib.dll!System.Threading.WaitHandle.WaitOne(int millisecondsTimeout, bool exitContext) + 0x28 bytes    
    FSharp.Core.dll!Microsoft.FSharp.Control.AsyncImpl.ResultCell<Microsoft.FSharp.Control.AsyncBuilderImpl.Result<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>>.TryWaitForResultSynchronously(Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x36 bytes  
    FSharp.Core.dll!Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(System.Threading.CancellationToken token, Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x1ba bytes 
    FSharp.Core.dll!Microsoft.FSharp.Control.FSharpAsync.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout, Microsoft.FSharp.Core.FSharpOption<System.Threading.CancellationToken> cancellationToken) + 0xb9 bytes   
    WikiSurvey.exe!<StartupCode$WikiSurvey>.$Program.main@() Line 97 + 0x55 bytes   F#
4

2 に答える 2

8

ウィキペディアのせいではありません。これは、Async.Parallel内部でどのように機能するかの結果です。の型シグネチャAsync.Parallelseq<Async<'T>> -> Async<'T[]>. シーケンスからのすべての結果を含む単一の Async 値を返します。したがって、すべての計算が返されるまで戻りませんseq<Async<'T>>

例として、未処理のリクエスト、つまり、サーバーに送信されたがまだ応答を受信/解析していないリクエストの数を追跡するようにコードを変更しました。

open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.WebExtensions
open System
open System.Net
open System.Threading

type WebClientWithTimeout() =
    inherit WebClient()

    let mutable timeout = -1
    member __.Timeout
        with get () = timeout
        and set value = timeout <- value

    override x.GetWebRequest uri =
        let r = base.GetWebRequest(uri)
        r.Timeout <- x.Timeout
        r

type ParsedDoc = ParsedDoc
type ParsedArticle = ParsedArticle

let parseDoc (str : string) = ParsedDoc
let parseArticle (doc : ParsedDoc) = Some ParsedArticle

/// A synchronized wrapper around Console.Out so we don't
/// get garbled console output.
let synchedOut =
    System.Console.Out
    |> System.IO.TextWriter.Synchronized

let parseWikiAsync(url : string, outstandingRequestCount : int ref) =
    async {
    use wc = new WebClientWithTimeout(Timeout = 5000)
    wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

    // Increment the outstanding request count just before we send the request.
    do
        // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
        // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
        // to be created which somehow defeats the synchronization and garbles the output.
        let msg =
            Interlocked.Increment outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    let! html = wc.AsyncDownloadString(Uri(url))
    let ret =
        try html |> parseDoc |> parseArticle
        with ex ->
            let msg = sprintf "%A" ex
            synchedOut.WriteLine msg
            None

    // Decrement the outstanding request count now that we've
    // received a reponse and parsed it.
    do
        let msg =
            Interlocked.Decrement outstandingRequestCount
            |> sprintf "Outstanding requests: %i"
        synchedOut.WriteLine msg

    return ret
    }

/// Writes a message to the console, passing a value through
/// so it can be used within a function pipeline.
let inline passThruWithMessage (msg : string) value =
    Console.WriteLine msg
    value

let en100 =
    let outstandingRequestCount = ref 0
    seq { for _ in 1..120 ->
            parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) }
    |> Async.Parallel
    |> Async.RunSynchronously
    |> passThruWithMessage "Finished running all of the requests."
    |> Seq.choose id
    |> Seq.take 100

そのコードをコンパイルして実行すると、次のような出力が表示されます。

Outstanding requests: 4
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 3
Outstanding requests: 5
Outstanding requests: 6
Outstanding requests: 7
Outstanding requests: 8
Outstanding requests: 9
Outstanding requests: 10
Outstanding requests: 12
Outstanding requests: 14
Outstanding requests: 15
Outstanding requests: 16
Outstanding requests: 17
Outstanding requests: 18
Outstanding requests: 13
Outstanding requests: 19
Outstanding requests: 20
Outstanding requests: 24
Outstanding requests: 22
Outstanding requests: 26
Outstanding requests: 27
Outstanding requests: 28
Outstanding requests: 29
Outstanding requests: 30
Outstanding requests: 25
Outstanding requests: 21
Outstanding requests: 23
Outstanding requests: 11
Outstanding requests: 29
Outstanding requests: 28
Outstanding requests: 27
Outstanding requests: 26
Outstanding requests: 25
Outstanding requests: 24
Outstanding requests: 23
Outstanding requests: 22
Outstanding requests: 21
Outstanding requests: 20
Outstanding requests: 19
Outstanding requests: 18
Outstanding requests: 17
Outstanding requests: 16
Outstanding requests: 15
Outstanding requests: 14
Outstanding requests: 13
Outstanding requests: 12
Outstanding requests: 11
Outstanding requests: 10
Outstanding requests: 9
Outstanding requests: 8
Outstanding requests: 7
Outstanding requests: 6
Outstanding requests: 5
Outstanding requests: 4
Outstanding requests: 3
Outstanding requests: 2
Outstanding requests: 1
Outstanding requests: 0
Finished running all of the requests.

ご覧のとおり、すべてのリクエストは、いずれかが解析される前に行われます。したがって、接続が遅い場合、または大量のドキュメントを取得しようとしている場合、サーバーは接続を切断している可能性があります。送信しようとしている応答を取得していないと想定する場合があります。コードのもう 1 つの問題は、 で生成する要素の数を明示的に指定する必要があることですseq。これにより、コードの再利用性が低下します。

より良い解決策は、消費するコードで必要とされるページを取得して解析することです。(そして、考えてみれば、まさにそれが F# のseq良いところです。) まず、Uri を受け取って生成する関数を作成します。seq<Async<'T>>つまり、値の無限のシーケンスを生成しAsync<'T>、それぞれが取得する値を生成します。 Uri からのコンテンツを解析し、結果を返します。

/// Given a Uri, creates an infinite sequence of whose elements are retrieved
/// from the Uri.
let createDocumentSeq (uri : System.Uri) =
    #if DEBUG
    let outstandingRequestCount = ref 0
    #endif

    Seq.initInfinite <| fun _ ->
        async {
        use wc = new WebClientWithTimeout(Timeout = 5000)
        wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

        #if DEBUG
        // Increment the outstanding request count just before we send the request.
        do
            // NOTE : The message must be created THEN passed to synchedOut.WriteLine --
            // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure
            // to be created which somehow defeats the synchronization and garbles the output.
            let msg =
                Interlocked.Increment outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        let! html = wc.AsyncDownloadString uri
        let ret =
            try Some html
            with ex ->
                let msg = sprintf "%A" ex
                synchedOut.WriteLine msg
                None

        #if DEBUG
        // Decrement the outstanding request count now that we've
        // received a reponse and parsed it.
        do
            let msg =
                Interlocked.Decrement outstandingRequestCount
                |> sprintf "Outstanding requests: %i"
            synchedOut.WriteLine msg
        #endif

        return ret
        }

この関数を使用して、ページをストリームとして取得します。

//
let en100_Streaming =
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    |> Seq.choose (fun asyncDoc ->
        Async.RunSynchronously asyncDoc
        |> Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

このコードを実行すると、サーバーから一度に 1 つずつ結果が取得され、未処理のリクエストが残っていないことがわかります。また、取得する結果の数を変更するのは非常に簡単です。必要なのは、 に渡す値を変更することだけですSeq.take

このストリーミング コードは問題なく動作しますが、リクエストを並行して実行しないため、多数のドキュメントの処理が遅くなる可能性があります。これは簡単に修正できる問題ですが、解決策は少し直感的ではないかもしれません。リクエストのシーケンス全体を並行して実行しようとする代わりに (これは元のコードの問題です)、 を使用してリクエストの小さなバッチを並行しAsync.Parallelて実行し、 を使用して結果を結合してフラットなシーケンスに戻す関数を作成しましょう。.Seq.collect

/// Given a sequence of Async<'T>, creates a new sequence whose elements
/// are computed in batches of a specified size.
let parallelBatch batchSize (sequence : seq<Async<'T>>) =
    sequence
    |> Seq.windowed batchSize
    |> Seq.collect (fun batch ->
        batch
        |> Async.Parallel
        |> Async.RunSynchronously)

この関数を利用するには、ストリーミング バージョンのコードにいくつかの小さな調整が必要です。

let en100_Batched =
    let batchSize = 10
    #if DEBUG
    let documentCount = ref 0
    #endif

    Uri ("http://en.wikipedia.org/wiki/Special:Random")
    |> createDocumentSeq
    // Execute batches in parallel
    |> parallelBatch batchSize
    |> Seq.choose (Option.bind (parseDoc >> parseArticle))
    #if DEBUG
    |> Seq.map (fun x ->
        let msg =
            Interlocked.Increment documentCount
            |> sprintf "Parsed documents: %i"
        synchedOut.WriteLine msg
        x)
    #endif
    |> Seq.take 50
    // None of the computations actually take place until
    // this point, because Seq.toArray forces evaluation of the sequence.
    |> Seq.toArray

繰り返しますが、取得するドキュメントの数は簡単に変更でき、バッチ サイズも簡単に変更できます (繰り返しますが、適度に小さくすることをお勧めします)。必要に応じて、実行時にそれらを切り替えることができるように、「ストリーミング」と「バッチ処理」のコードにいくつかの調整を加えることができます。

最後にもう 1 つ - 私のコードではリクエストがタイムアウトしてはならないので、おそらくWebClientWithTimeoutクラスを削除してWebClient直接使用することができます。

于 2012-08-30T18:26:47.550 に答える
2

あなたのコードは特に特別なことをしているようには見えないので、ウィキペディアはあなたの活動を気に入っていないと思います。彼らのボットポリシーを見てください。もう少し深く掘り下げると、厳格なUser-Agent ポリシーもあるようです

2010 年 2 月 15 日以降、ウィキメディア サイトではすべてのリクエストに HTTP User-Agent ヘッダーが必要です。これは、技術スタッフによって行われた運用上の決定であり、技術メーリング リストで発表され、議論されました。[1][2] その理由は、User-Agent 文字列を送信しないクライアントは、ほとんどの場合、プロジェクトに利益をもたらすことなく、サーバーに多くの負荷を引き起こす動作の悪いスクリプトであるということです。Perl の libwww で使用されるような、User-Agent 文字列の説明的でないデフォルト値も、ウィキメディア Web サイト (または api.php などの Web サイトの一部) の使用をブロックされる可能性があることに注意してください。

User-Agent ヘッダーを送信しないユーザー エージェント (ブラウザーまたはスクリプト) では、次のようなエラー メッセージが表示される場合があります。

スクリプトでは、連絡先情報を含む有益な User-Agent 文字列を使用する必要があります。そうしないと、予告なしに IP ブロックされる場合があります。

したがって、適切なユーザーエージェントを追加したとしても、おそらく彼らはあなたがしていることを気に入らないでしょうが、試してみることもできます.

wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 (FriendlyBot@friendlybot.com)")

また、サーバーに非常に多くの接続を作成することを避けても害はありません.

于 2012-08-29T04:12:34.457 に答える