0

コードがコンソール プログラムにコンパイルされるか、 fsi --use:Program.fs --exec --quietとして実行される場合、一部のスレッドは終了する前に終了します。すべてのスレッドが終了するのを待つ方法はありますか?

この問題は、「複数の MailboxProcesser が存在する場合のプログラム終了の問題」として説明できます。

出力例

(最後の行が切り捨てられ、最後の出力関数 ( printfn "[Main] after crawl") が実行されないことに注意してください。)

【メイン】クロール前
[クロール] 結果を返す前
http://news.google.com がエージェント 1 によってクロールされました。
[監督者] 限界に達した
エージェント 5 が完了しました。
エージェント 1 によってクロールされた http://www.gstatic.com/news/img/favicon.ico。
[監督者] 限界に達した
エージェント 1 が完了しました。
http://www.google.com/imghp?hl=en&tab=ni エージェントによってクロールされる 4.
[監督者] 限界に達した
エージェント 4 が完了しました。
http://www.google.com/webhp?hl=en&tab=nw エージェントによってクロールされる 2.
[監督者] 限界に達した
エージェント 2 が完了しました。
http://news.google.

コード

編集:いくつか追加しましSystem.Threading.Thread.CurrentThread.IsBackground <- falseた。

open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.IO
open System.Net
open System.Text.RegularExpressions

module Helpers =

    type Message =
        | Done
        | Mailbox of MailboxProcessor<Message>
        | Stop
        | Url of string option
        | Start of AsyncReplyChannel<unit>

    // Gates the number of crawling agents.
    [<Literal>]
    let Gate = 5

    // Extracts links from HTML.
    let extractLinks html =
        let pattern1 = "(?i)href\\s*=\\s*(\"|\')/?((?!#.*|/\B|" + 
                       "mailto:|location\.|javascript:)[^\"\']+)(\"|\')"
        let pattern2 = "(?i)^https?"

        let links =
            [
                for x in Regex(pattern1).Matches(html) do
                    yield x.Groups.[2].Value
            ]
            |> List.filter (fun x -> Regex(pattern2).IsMatch(x))
        links

    // Fetches a Web page.
    let fetch (url : string) =
        try
            let req = WebRequest.Create(url) :?> HttpWebRequest
            req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
            req.Timeout <- 5000
            use resp = req.GetResponse()
            let content = resp.ContentType
            let isHtml = Regex("html").IsMatch(content)
            match isHtml with
            | true -> use stream = resp.GetResponseStream()
                      use reader = new StreamReader(stream)
                      let html = reader.ReadToEnd()
                      Some html
            | false -> None
        with
        | _ -> None

    let collectLinks url =
        let html = fetch url
        match html with
        | Some x -> extractLinks x
        | None -> []

open Helpers

// Creates a mailbox that synchronizes printing to the console (so 
// that two calls to 'printfn' do not interleave when printing)
let printer = 
    MailboxProcessor.Start(fun x -> async {
        while true do 
        let! str = x.Receive()
        System.Threading.Thread.CurrentThread.IsBackground <- false
        printfn "%s" str })
// Hides standard 'printfn' function (formats the string using 
// 'kprintf' and then posts the result to the printer agent.
let printfn fmt = 
    Printf.kprintf printer.Post fmt

let crawl url limit = 
    // Concurrent queue for saving collected urls.
    let q = ConcurrentQueue<string>()

    // Holds crawled URLs.
    let set = HashSet<string>()


    let supervisor =
        MailboxProcessor.Start(fun x -> async {
            System.Threading.Thread.CurrentThread.IsBackground <- false
            // The agent expects to receive 'Start' message first - the message
            // carries a reply channel that is used to notify the caller
            // when the agent completes crawling.
            let! start = x.Receive()
            let repl =
              match start with
              | Start repl -> repl
              | _ -> failwith "Expected Start message!"

            let rec loop run =
                async {
                    let! msg = x.Receive()
                    match msg with
                    | Mailbox(mailbox) -> 
                        let count = set.Count
                        if count < limit - 1 && run then 
                            let url = q.TryDequeue()
                            match url with
                            | true, str -> if not (set.Contains str) then
                                                let set'= set.Add str
                                                mailbox.Post <| Url(Some str)
                                                return! loop run
                                            else
                                                mailbox.Post <| Url None
                                                return! loop run

                            | _ -> mailbox.Post <| Url None
                                   return! loop run
                        else
                            printfn "[supervisor] reached limit" 
                            // Wait for finishing
                            mailbox.Post Stop
                            return! loop run
                    | Stop -> printfn "[Supervisor] stop"; return! loop false
                    | Start _ -> failwith "Unexpected start message!"
                    | Url _ -> failwith "Unexpected URL message!"
                    | Done -> printfn "[Supervisor] Supervisor is done."
                              (x :> IDisposable).Dispose()
                              // Notify the caller that the agent has completed
                              repl.Reply(())
                }
            do! loop true })


    let urlCollector =
        MailboxProcessor.Start(fun y ->
            let rec loop count =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = y.TryReceive(6000)
                    match msg with
                    | Some message ->
                        match message with
                        | Url u ->
                            match u with
                            | Some url -> q.Enqueue url
                                          return! loop count
                            | None -> return! loop count
                        | _ ->
                            match count with
                            | Gate -> (y :> IDisposable).Dispose()
                                      printfn "[urlCollector] URL collector is done."
                                      supervisor.Post Done
                            | _ -> return! loop (count + 1)
                    | None -> supervisor.Post Stop
                              return! loop count
                }
            loop 1)

    /// Initializes a crawling agent.
    let crawler id =
        MailboxProcessor.Start(fun inbox ->
            let rec loop() =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = inbox.Receive()
                    match msg with
                    | Url x ->
                        match x with
                        | Some url -> 
                                let links = collectLinks url
                                printfn "%s crawled by agent %d." url id
                                for link in links do
                                    urlCollector.Post <| Url (Some link)
                                supervisor.Post(Mailbox(inbox))
                                return! loop()
                        | None -> supervisor.Post(Mailbox(inbox))
                                  return! loop()
                    | _ -> printfn "Agent %d is done." id
                           urlCollector.Post Done
                           (inbox :> IDisposable).Dispose()
                    }
            loop())

    // Send 'Start' message to the main agent. The result
    // is asynchronous workflow that will complete when the
    // agent crawling completes
    let result = supervisor.PostAndAsyncReply(Start)
    // Spawn the crawlers.
    let crawlers = 
        [
            for i in 1 .. Gate do
                yield crawler i
        ]

    // Post the first messages.
    crawlers.Head.Post <| Url (Some url)
    crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None) 
    printfn "[Crawl] before return result"
    result

// Example:
printfn "[Main] before crawl"
crawl "http://news.google.com" 5
|> Async.RunSynchronously
printfn "[Main] after crawl"
4

4 に答える 4

3

私がコードを正しく認識した場合、それはあなたの以前の質問(および私の回答) に基づいています。

プログラムは、スーパーバイザ エージェントが完了するまで待機します (Startメッセージを送信し、 を使用して応答を待機しますRunSynchronously)。これにより、アプリケーションが終了する前に、メイン エージェントとすべてのクローラーが完了することが保証されます。

printer問題は、エージェントが完了するまで待たないことです! そのため、(再定義された) 関数の最後の呼び出しでprintfnエージェントにメッセージが送信され、アプリケーションは印刷エージェントが終了するまで待たずに完了します。

私の知る限り、エージェントが現在キューにあるすべてのメッセージの処理を完了するまで待機する「標準パターン」はありません。あなたが試すことができるいくつかのアイデアは次のとおりです。

  • プロパティを確認できCurrentQueueLengthます (値が 0 になるまで待ちます) が、それでもエージェントがすべてのメッセージの処理を完了したことを意味するわけではありません。

  • 新しいタイプのメッセージを追加し、エージェントがそのメッセージに返信するまで待機することで、エージェントをより複雑にすることができます (メッセージへの返信を現在待っているのと同じようにStart)。

于 2011-07-10T20:48:28.747 に答える
0

F# がゼロであることは知っていますが、通常はThread.Joinを使用して対象のすべてのスレッドを待ちます。あなたの場合のように私には見えます.への呼び出しを介して開始される興味のあるものを待つ必要があります.Start.

生のマネージド スレッドに対するより高いレベルの (より単純な) 抽象化を提供する Task Parallel Library を検討することもできます。ここでタスクの完了を待機する例。

于 2011-07-10T18:16:38.097 に答える
0

.NET スレッドにはプロパティ Thread.IsBackground があり、これが true に設定されている場合、スレッドはプロセスの終了を妨げません。false に設定すると、プロセスが終了できなくなります。参照: http://msdn.microsoft.com/en-us/library/system.threading.thread.isbackground.aspx

エージェントを実行するスレッドはスレッド プールから取得されるため、デフォルトで Thread.IsBackground が false に設定されます。

メッセージを読むたびに、スレッドの IsBackground を false に設定してみてください。これを行う関数を追加して、アプローチをよりクリーンにすることができます。let を使用するたびに問題を解決する最善の方法ではない可能性があります。スレッドを変更できるため、適切に機能させるには慎重に実装する必要があります。特定の質問に答えるために言及したと思いました

すべてのスレッドが終了するのを待つ方法はありますか?

また、特定のスレッドがプログラムの終了を停止し、他のスレッドが停止しなかった理由を人々が理解できるようにします。

于 2011-07-11T13:34:00.430 に答える
0

問題を解決したと思います。プリンターエージェントのSystem.Threading.Thread.CurrentThread.IsBackground <- false後に追加します。let!

ただし、元のコード (Tomas の AsyncChannel 修正前の最初のバージョン) を変更しようとしましたが、System.Threading.Thread.CurrentThread.IsBackground <- false結局 を追加しlet!ても機能しません。わかりません。

助けてくれてありがとう。ようやくバッチ プロセス用の最初の F# アプリケーションを開始できます。MailboxProcessor は、デフォルトで IsBackground を false に設定する必要があると思います。とにかく、Microsoftに変更を依頼してください。

[更新]コンパイルされたアセンブリがうまく機能することがわかりました。しかしfsi --user:Program --exec --quiet、まだ同じです。fsiのバグらしい?

于 2011-07-11T21:15:32.947 に答える