私はscalaに不慣れで、scalazに非常に慣れていません。別のスタックオーバーフローの回答といくつかのハンドホールディングにより、scalaz.stream を使用して、Twitter API の結果を継続的にフェッチするプロセスを実装することができました。ここで、Twitter ハンドルが格納されている Cassandra DB に対して同じことを行いたいと思います。
Twitter の結果を取得するためのコードは次のとおりです。
def urls: Seq[(Handle,URL)] = {
Await.result(
getAll(connection).map { List =>
List.map(twitterToGet =>
(twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
)
},
5 seconds)
}
val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
url => Task.delay {
val finalResult = callTwitter(url)
if (finalResult.tweets.nonEmpty) {
connection.updateTwitter(finalResult)
} else {
println("\n" + finalResult.handle + " does not have new tweets")
}
s"\ntwitter Fetch & database update completed"
}
}
val P = Process
val process =
(time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
through(fetchUrl)
val fetched = process.runLog.run
fetched.foreach(println)
私がやろうとしていることは、
def urls: Seq[(Handle,URL)] = {
Cassandra の結果を (awakeEvery を使用して) 継続的にフェッチし、アクターに送信して上記の twitter フェッチ コードを実行します。
私の質問は、これを scalaz.stream で実装する最良の方法は何ですか? すべてのデータベース結果を取得してから、すべてのデータベース結果を再度取得する前に遅延が必要であることに注意してください。上記の Twitter フェッチ コードと同じアーキテクチャを使用する必要がありますか? もしそうなら、入力を必要としない channel.lift を作成するにはどうすればよいですか? scalaz.stream でもっと良い方法はありますか?
前もって感謝します