2

私はscalaに不慣れで、scalazに非常に慣れていません。別のスタックオーバーフローの回答といくつかのハンドホールディングにより、scalaz.stream を使用して、Twitter API の結果を継続的にフェッチするプロセスを実装することができました。ここで、Twitter ハンドルが格納されている Cassandra DB に対して同じことを行いたいと思います。

Twitter の結果を取得するためのコードは次のとおりです。

def urls: Seq[(Handle,URL)] = {
 Await.result(

   getAll(connection).map { List =>
      List.map(twitterToGet =>
   (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
   )
  },
    5 seconds)

}

val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
  url => Task.delay {

    val finalResult = callTwitter(url)
    if (finalResult.tweets.nonEmpty) {
      connection.updateTwitter(finalResult)
    } else {
      println("\n" + finalResult.handle + " does not have new tweets")
    }
    s"\ntwitter Fetch & database update completed"

  }
}

val P = Process
val process =
  (time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
    through(fetchUrl)

val fetched = process.runLog.run
fetched.foreach(println)

私がやろうとしていることは、

def urls: Seq[(Handle,URL)] = {

Cassandra の結果を (awakeEvery を使用して) 継続的にフェッチし、アクターに送信して上記の twitter フェッチ コードを実行します。

私の質問は、これを scalaz.stream で実装する最良の方法は何ですか? すべてのデータベース結果を取得してから、すべてのデータベース結果を再度取得する前に遅延が必要であることに注意してください。上記の Twitter フェッチ コードと同じアーキテクチャを使用する必要がありますか? もしそうなら、入力を必要としない channel.lift を作成するにはどうすればよいですか? scalaz.stream でもっと良い方法はありますか?

前もって感謝します

4

1 に答える 1

1

今日はこれが機能しました。これを行う最もクリーンな方法は、データベースの結果をストリームとして出力し、ストリームの最後にシンクを接続して Twitter 処理を行うことです。私が実際に持っているのは、データベースの結果を継続的に取得し、Twitter 処理のためにアクターに送信するため、もう少し複雑です。結果を取得するスタイルは、私の質問の元のコードに従います。

val connection = new simpleClient(conf.getString("cassandra.node"))

implicit val threadPool = new ScheduledThreadPoolExecutor(4)
val system = ActorSystem("mySystem")
val twitterFetch = system.actorOf(Props[TwitterFetch], "twitterFetch")

  def myEffect = channel.lift[Task, simpleClient, String]{
    connection: simpleClient => Task.delay{

      val results = Await.result(
        getAll(connection).map { List =>
          List.map(twitterToGet =>
            (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
          )
        },
        5 seconds)

      println("Query Successful, results= " +results +" at " + format.print(System.currentTimeMillis()))

      twitterFetch ! fetched(connection, results)
      s"database fetch completed"
    }
  }

  val P = Process
  val process =
    (time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
      through(myEffect)))

  val fetching = process.runLog.run
  fetching.foreach(println)

いくつかのメモ:

入力なしで channel.lift を使用することについて質問していましたが、入力は cassandra 接続であることが明らかになりました。

この線

val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
  through(myEffect)))

一度ではなく継続的に結果を取得したかったので、zipWith から flatMap に変更しました。

于 2015-06-09T18:09:40.870 に答える