1

コマンドの Web サーバーを継続的にポーリングするクライアントを作成する必要があります。サーバーからの応答は、コマンドが使用可能であること (この場合、応答にコマンドが含まれます)、または使用可能なコマンドがないという指示を示します。着信コマンドの新しい要求を開始する必要があります。

私はスプレー クライアントと Akka を使用してそれを行う方法を理解しようとしていますが、それを行う方法を考えることができますが、どれもそれを行うための慣用的な方法ではないようです。質問は次のとおりです。

いくつかのスレッドが同じ Web サーバーに着信コマンドをポーリングし、コマンドをアクターに渡す最も賢明な方法は何ですか?

4

2 に答える 2

1

この例では、 Spray -clientscala futures、およびAkka schedulerを使用します。

実装は、目的の動作によって異なります (同時に多くのリクエストを並行して実行する、異なる間隔で実行する、一度に 1 つの応答を処理するために 1 つのアクターに応答を送信する、並行して処理するために多くのアクターに応答を送信するなど)。

この特定の例は、同時に多くのリクエストを並行して実行し、同時に発生した他のリクエストが完了するのを待たずに、完了するたびに各結果で何かを行う方法を示しています。

以下のコードは、5 秒ごとに 0.0.0.0:9000/helloWorld と 0.0.0.0:9000/goodbyeWorld への 2 つの HTTP リクエストを並行して実行します。

Scala 2.10、Spray 1.1-M7、および Akka 2.1.2 でテスト済み:

定期的なジョブの実行を処理する実際のスケジューリング コード:

// Schedule a periodic task to occur every 5 seconds, starting as soon 
// as this schedule is registered
system.scheduler.schedule(initialDelay = 0 seconds, interval = 5 seconds) {

  val paths = Seq("helloWorld", "goodbyeWorld")

  // perform an HTTP request to 0.0.0.0:9000/helloWorld and 
  // 0.0.0.0:9000/goodbyeWorld
  // in parallel (possibly, depending on available cpu and cores)
  val retrievedData = Future.traverse(paths) { path =>
    val response = fetch(path)
    printResponse(response)
    response
  }
}

ヘルパー メソッド / ボイラープレートのセットアップ:

// Helper method to fetch the body of an HTTP endpoint as a string
def fetch(path: String): Future[String] = {
  pipeline(HttpRequest(method = GET, uri = s"/$path"))

}

// Helper method for printing a future'd string asynchronously
def printResponse(response: Future[String]) {
  // Alternatively, do response.onComplete {...}
  for (res <- response) {
    println(res)
  }
}


// Spray client boilerplate
val ioBridge = IOExtension(system).ioBridge()
val httpClient = system.actorOf(Props(new HttpClient(ioBridge)))

// Register a "gateway" to a particular host for HTTP requests 
// (0.0.0.0:9000 in this case)
val conduit = system.actorOf(
  props = Props(new HttpConduit(httpClient, "0.0.0.0", 9000)),
  name = "http-conduit"
)

// Create a simple pipeline to deserialize the request body into a string
val pipeline: HttpRequest => Future[String] = {
  sendReceive(conduit) ~> unmarshal[String]
}

いくつかのメモ:

  • Future.traverse先物を並行して実行するために使用されます (順序を無視します)。フューチャのリストで for を使用すると、一度に 1 つのフューチャが実行され、それぞれが完了するのを待ちます。

    // Executes `oneThing`, executes `andThenAnother` when `oneThing` is complete,
    // then executes `finally` when `andThenAnother` completes.
    for {
      oneThing <- future1
      andThenAnother <- future2
      finally <- future3
    } yield (...)
    
  • system実際の Akka アクター システムに置き換える必要があります。

  • system.scheduler.scheduleこの場合、任意のコード ブロックを 5 秒ごとに実行しています。

    system.scheduler.schedule(
      initialDelay = 0 seconds,
      frequency    = 30 minutes,
      receiver     = rssPoller, // an actorRef
      message      = "doit" // the message to send to the actorRef
    )
    
  • 特定のケースでは、 printResponse を代わりにアクター send に置き換えることができます: anActorRef ! response.

  • コード サンプルではエラーが考慮されていません。Future onComplete コールバックを使用して、printResponse (または同等の) メソッドでエラーを処理するのが適切です。response.onComplete {...}
  • fetch当然のことかもしれませんが、メソッドと付随するスプレー コードを置き換えるだけで、spray-client を別の http クライアントに置き換えることができます。

更新:完全な実行コード例はこちら:

リポジトリを git clone し、指定されたコミット sha をチェックアウトし、$ sbt runに移動し、実行され0.0.0.0:9000たコンソールでコードを確認します。ORが出力されます(Future.traverse が並列実行されるため、順序はランダムになる可能性があります)。sbt runHello World!\n'Goodbye World!Goodbye World!\nHelloWorld!

于 2013-03-17T08:50:50.497 に答える
0

HTML5 Server-Sent Eventsを使用できます。多くの Scala フレームワークで実装されています。たとえば、xitrumコードでは次のようになります。

class SSE extends Controller {
  def sse = GET("/sse") {
    addConnectionClosedListener {
      // The connection has been closed
      // Unsubscribe from events, release resources etc.
    }

    future {
        respondEventSource("command1")
        //...
        respondEventSource("command2")
        //...
    }
 }

SSE は非常にシンプルで、ブラウザーだけでなく、あらゆるソフトウェアで使用できます。Akka は xitrum に統合されており、同様のシステムで使用しています。ただし、非同期サーバーに netty を使用します。これは、10 ~ 15 スレッドで数千の要求を処理するのにも適しています。

このようにして、クライアントはサーバーとの接続を維持し、接続が切断されたときに再接続します。

于 2013-03-16T13:58:28.950 に答える