4

非同期コールバックを多用するVert.x 2.x ( http://vertx.io ) を使用しています。これらは、典型的なネスティング/コールバック地獄の問題ですぐに扱いにくくなります。

私は Scala Futures/Promises (デファクト アプローチになると思います) と Reactive Extensions (RxScala) の両方を検討しました。

私のテストから、いくつかの興味深いパフォーマンス結果が見つかりました。

私のテストは非常に基本的なもので、Vert.x イベントバス全体で非同期呼び出しを行い、HTTP 200 で返される応答を処理する Vert.x verticle に (weighttp 経由で) 一連の HTTP 要求を発行するだけです。応答。

私が見つけたのは次のとおりです (ここでのパフォーマンスは、1 秒あたりの HTTP 要求で測定されます)。

  • 非同期コールバックのパフォーマンス = 68,305 rps
  • Rx パフォーマンス = 64,656 rps
  • 将来/約束のパフォーマンス = 61,376 rps

テスト条件は次のとおりです。

  • Mac Pro OS X Yosemite 10.10.2
  • Oracle JVM 1.8U25
  • weighttp バージョン 0.3 で
  • Vert.x 2.1.5
  • スカラ 2.10.4
  • RxScala 0.23.0
  • 4 x Web Service Verticle インスタンス
  • 4 x バックエンド サービス Verticle インスタンス

テストコマンドは

weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"

上記の数値は、5 回のテスト実行の平均であり、最良の結果と最悪の結果を除いたものです。結果は提示された平均値付近で非常に一貫していることに注意してください (数百 rps の偏差以下)。

上記が発生する可能性がある既知の理由はありますか?つまり、Rx > Futures in pure requests per second?

私の意見では、Reactive Extensions ははるかに多くのことができるため優れていますが、非同期コールバックへの標準的なアプローチは通常、Futures/Promises トラックを下回っているように見えるため、パフォーマンスの低下に驚いています。

編集:これがWebサービスのバーチクルです

class WebVerticle extends Verticle {
  override def start() {
    val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
    val approach = container.env().getOrElse("APPROACH", "ASYNC")
    container.logger.info("Listening on port: " + port)
    container.logger.info("Using approach: " + approach)

    vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
      approach match {
        case "ASYNC" => sendAsync(req, "hello")
        case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
        case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
      }
    }.listen(port)
  }

  // Async callback
  def sendAsync(req: HttpServerRequest, body: String): Unit = {
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      req.response.end(msg.body())
    })
  }

  // Rx 
  def sendWithObservable(body: String) : Observable[String] = {
    val subject = ReplaySubject[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      subject.onNext(msg.body())
      subject.onCompleted()
    })
    subject
  }

  // Futures    
  def sendWithFuture(body: String) : Future[String] = {
    val promise = Promise[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      promise.success(msg.body())
    })
    promise.future
  }
}

編集:これがバックエンドVerticleです

class ServiceVerticle extends Verticle {
  override def start(): Unit = {
    vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
      msg.reply("Hello Scala")
    })
  }
}
4

0 に答える 0