この例では、 Spray -client、scala futures、およびAkka schedulerを使用します。
実装は、目的の動作によって異なります (同時に多くのリクエストを並行して実行する、異なる間隔で実行する、一度に 1 つの応答を処理するために 1 つのアクターに応答を送信する、並行して処理するために多くのアクターに応答を送信するなど)。
この特定の例は、同時に多くのリクエストを並行して実行し、同時に発生した他のリクエストが完了するのを待たずに、完了するたびに各結果で何かを行う方法を示しています。
以下のコードは、5 秒ごとに 0.0.0.0:9000/helloWorld と 0.0.0.0:9000/goodbyeWorld への 2 つの HTTP リクエストを並行して実行します。
Scala 2.10、Spray 1.1-M7、および Akka 2.1.2 でテスト済み:
定期的なジョブの実行を処理する実際のスケジューリング コード:
// Schedule a periodic task to occur every 5 seconds, starting as soon
// as this schedule is registered
system.scheduler.schedule(initialDelay = 0 seconds, interval = 5 seconds) {
val paths = Seq("helloWorld", "goodbyeWorld")
// perform an HTTP request to 0.0.0.0:9000/helloWorld and
// 0.0.0.0:9000/goodbyeWorld
// in parallel (possibly, depending on available cpu and cores)
val retrievedData = Future.traverse(paths) { path =>
val response = fetch(path)
printResponse(response)
response
}
}
ヘルパー メソッド / ボイラープレートのセットアップ:
// Helper method to fetch the body of an HTTP endpoint as a string
def fetch(path: String): Future[String] = {
pipeline(HttpRequest(method = GET, uri = s"/$path"))
}
// Helper method for printing a future'd string asynchronously
def printResponse(response: Future[String]) {
// Alternatively, do response.onComplete {...}
for (res <- response) {
println(res)
}
}
// Spray client boilerplate
val ioBridge = IOExtension(system).ioBridge()
val httpClient = system.actorOf(Props(new HttpClient(ioBridge)))
// Register a "gateway" to a particular host for HTTP requests
// (0.0.0.0:9000 in this case)
val conduit = system.actorOf(
props = Props(new HttpConduit(httpClient, "0.0.0.0", 9000)),
name = "http-conduit"
)
// Create a simple pipeline to deserialize the request body into a string
val pipeline: HttpRequest => Future[String] = {
sendReceive(conduit) ~> unmarshal[String]
}
いくつかのメモ:
Future.traverse
先物を並行して実行するために使用されます (順序を無視します)。フューチャのリストで for を使用すると、一度に 1 つのフューチャが実行され、それぞれが完了するのを待ちます。
// Executes `oneThing`, executes `andThenAnother` when `oneThing` is complete,
// then executes `finally` when `andThenAnother` completes.
for {
oneThing <- future1
andThenAnother <- future2
finally <- future3
} yield (...)
system
実際の Akka アクター システムに置き換える必要があります。
system.scheduler.schedule
この場合、任意のコード ブロックを 5 秒ごとに実行しています。
system.scheduler.schedule(
initialDelay = 0 seconds,
frequency = 30 minutes,
receiver = rssPoller, // an actorRef
message = "doit" // the message to send to the actorRef
)
特定のケースでは、 printResponse を代わりにアクター send に置き換えることができます: anActorRef ! response
.
- コード サンプルではエラーが考慮されていません。Future onComplete コールバックを使用して、printResponse (または同等の) メソッドでエラーを処理するのが適切です。
response.onComplete {...}
fetch
当然のことかもしれませんが、メソッドと付随するスプレー コードを置き換えるだけで、spray-client を別の http クライアントに置き換えることができます。
更新:完全な実行コード例はこちら:
リポジトリを git clone し、指定されたコミット sha をチェックアウトし、$ sbt run
に移動し、実行され0.0.0.0:9000
たコンソールでコードを確認します。ORが出力されます(Future.traverse が並列実行されるため、順序はランダムになる可能性があります)。sbt run
Hello World!\n'Goodbye World!
Goodbye World!\nHelloWorld!