Don Syme ブログから ( http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus -twitter-sample.aspx ) Twitter ストリーム リスナーを実装しようとしました。私の目標は、「信頼性の高いシステムを構築するときは、処理する前にツイートを保存またはキューに入れることが多い」という twitter api ドキュメントのガイダンスに従うことです。
したがって、私のコードには 2 つのコンポーネントが必要です。
- 各status/tweet jsonを積み上げて処理するキュー
- json 文字列のツイートをキューにダンプする Twitter ストリームを読み取るための何か
私は以下を選択します。
- 各ツイートを投稿するエージェントで、json をデコードしてデータベースにダンプします
- シンプルな http ウェブリクエスト
また、データベースへの挿入によるエラーをテキスト ファイルにダンプしたいと考えています。(おそらく、すべてのエラーに対してスーパーバイザー エージェントに切り替えます)。
2 つの問題:
- ここでの私の戦略は良いですか?私の理解が正しければ、エージェントはスマート キューのように動作し、そのメッセージを非同期的に処理します (キューに 10 人のユーザーがいる場合、1 番目のメッセージが終了してから 2 番目のメッセージが終了するのを待つのではなく、一度に多数のメッセージを処理します)。 ...)、 正しい ?
- Don Syme の投稿によると、しばらく前のすべてが分離されているため、StreamWriter とデータベース ダンプは分離されています。しかし、これが必要なので、データベース接続を閉じることはありません... ?
コードは次のようになります。
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine( error )
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString() )
| _ as ex -> ()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
どうもありがとう !
プロセッサ エージェントを使用したコードの編集:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post( newMsgList )
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())