2

Don Syme ブログから ( http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus -twitter-sample.aspx ) Twitter ストリーム リスナーを実装しようとしました。私の目標は、「信頼性の高いシステムを構築するときは、処理する前にツイートを保存またはキューに入れることが多い」という twitter api ドキュメントのガイダンスに従うことです。

したがって、私のコードには 2 つのコンポーネントが必要です。

  • 各status/tweet jsonを積み上げて処理するキュー
  • json 文字列のツイートをキューにダンプする Twitter ストリームを読み取るための何か

私は以下を選択します。

  • 各ツイートを投稿するエージェントで、json をデコードしてデータベースにダンプします
  • シンプルな http ウェブリクエスト

また、データベースへの挿入によるエラーをテキスト ファイルにダンプしたいと考えています。(おそらく、すべてのエラーに対してスーパーバイザー エージェントに切り替えます)。

2 つの問題:

  • ここでの私の戦略は良いですか?私の理解が正しければ、エージェントはスマート キューのように動作し、そのメッセージを非同期的に処理します (キューに 10 人のユーザーがいる場合、1 番目のメッセージが終了してから 2 番目のメッセージが終了するのを待つのではなく、一度に多数のメッセージを処理します)。 ...)、 正しい ?
  • Don Syme の投稿によると、しばらく前のすべてが分離されているため、StreamWriter とデータベース ダンプは分離されています。しかし、これが必要なので、データベース接続を閉じることはありません... ?

コードは次のようになります。

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

どうもありがとう !

プロセッサ エージェントを使用したコードの編集:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())
4

1 に答える 1

5

エージェントを説明する最良の方法は、ある状態を維持し、他のエージェント(または Web ページまたはデータベース)と通信できる実行中のプロセスであるということだと思います。エージェントベースのアプリケーションを作成する場合、互いにメッセージを送信する複数のエージェントを使用できることがよくあります。

Web からツイートを読み取ってデータベースに格納するエージェントを作成するというアイデアは、良い選択だと思います (ただし、ツイートをエージェントの状態としてメモリに保持することもできます)。

  • データベース接続を常に開いたままにしておくことはありません-MSSQL(およびMySQLもおそらく)は接続プールを実装しているため、接続を解放しても自動的に接続を閉じることはありません。これは、データベースにデータを書き込む必要があるたびに接続を再開する方が安全であり、同様に効率的であることを意味します。

  • 大量のエラー メッセージが表示されることが予想される場合を除き、ファイル ストリームに対しても同じことを行うでしょう (書き込み時にファイル ストリームを開くと、新しいコンテンツが末尾に追加されます)。

F# エージェントのキューが機能する方法は、メッセージを 1 つずつ処理することです (この例では、 を使用してメッセージを待機していますinbox.Receive()。キューに複数のメッセージが含まれている場合は、それらを 1 つずつ (ループで) 取得します)。

  • 一度に複数のメッセージを処理したい場合は、たとえば 10 個のメッセージを待機し、それらをリストとして別のエージェントに送信するエージェントを作成できます (その後、一括処理を実行します)。

  • timeoutメソッドにパラメーターを指定することもできるReceiveので、1 秒以内にすべてのメッセージが到着する限り、最大 10 個のメッセージを待つことができます。この方法では、メッセージを長時間保持しない一括処理を非常にエレガントに実装できます。

于 2010-08-25T16:27:50.140 に答える