7

約 2 日前に scala アクター フレームワークの学習を開始しました。アイデアを具体化するために、複数の同時接続を処理できる TCP ベースのエコー サーバーを実装することにしました。

エコー サーバーのコードは次のとおりです (エラー処理は含まれていません)。

class EchoServer extends Actor {
  private var connections = 0

  def act() {
    val serverSocket = new ServerSocket(6789)

    val echoServer = self
    actor { while (true) echoServer ! ("Connected", serverSocket.accept) }

    while (true) {
      receive {
        case ("Connected", connectionSocket: Socket) =>
          connections += 1
          (new ConnectionHandler(this, connectionSocket)).start
        case "Disconnected" =>
          connections -= 1
      }
    }
  }
}

基本的に、サーバーは「接続済み」および「切断済み」メッセージを処理するアクターです。これは、 serverSocketでaccept()メソッド (ブロッキング操作)を呼び出す匿名アクターにリッスンする接続を委任します。接続が到着すると、「Connected」メッセージを介してサーバーに通知し、新しく接続されたクライアントとの通信に使用するソケットを渡します。ConnectionHandlerクラスのインスタンスは、クライアントとの実際の通信を処理します。

接続ハンドラのコードは次のとおりです (いくつかのエラー処理が含まれています)。

class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
    extends Actor {

  def act() {
    for (input <- getInputStream; output <- getOutputStream) {
      val handler = self
      actor {
        var continue = true
        while (continue) {
          try {
            val req = input.readLine
            if (req != null) handler ! ("Request", req)
            else continue = false
          } catch {
            case e: IOException => continue = false
          }
        }

        handler ! "Disconnected"
      }

      var connected = true
      while (connected) {
        receive {
          case ("Request", req: String) =>
            try {
              output.writeBytes(req + "\n")
            } catch {
              case e: IOException => connected = false
            }
          case "Disconnected" =>
            connected = false
        }
      }
    }

    close()
    server ! "Disconnected"
  }

  // code for getInputStream(), getOutputStream() and close() methods
}

接続ハンドラーは、ソケットの入力ストリームでreadLine()メソッド (ブロッキング操作)を呼び出すことによって、要求がソケットに送信されるのを待機する匿名アクターを使用します。リクエストが受信されると、「リクエスト」メッセージがハンドラーに送信され、ハンドラーは単にリクエストをクライアントにエコー バックします。ハンドラーまたは匿名アクターが基礎となるソケットで問題を経験した場合、ソケットは閉じられ、クライアントがサーバーから切断されたことを示す「切断」メッセージがエコー サーバーに送信されます。

そのため、エコー サーバーを起動して、接続を待機させることができます。次に、新しいターミナルを開き、telnet 経由でサーバーに接続できます。リクエストを送信でき、正しく応答します。ここで、別のターミナルを開いてサーバーに接続すると、サーバーは接続を登録しますが、この新しい接続の接続ハンドラーを開始できません。既存の接続のいずれかを介してメッセージを送信しても、すぐに応答がありません。ここが興味深い部分です。既存のクライアント接続を 1 つを除いてすべて終了し、クライアント X を開いたままにすると、クライアント X 経由で送信した要求に対するすべての応答が返されます。いくつかのテストを行った結果、 start()を呼び出しても、後続のクライアント接続でact()メソッドが呼び出されていないと結論付けました。接続ハンドラを作成するメソッド。

接続ハンドラでブロック操作を正しく処理していないと思います。以前の接続は、リクエストを待機している匿名のアクターがブロックされている接続ハンドラーによって処理されているため、このブロックされたアクターが他のアクター (接続ハンドラー) の起動を妨げていると考えています。

scala アクターを使用する場合、ブロック操作をどのように処理すればよいですか?

どんな助けでも大歓迎です。

4

1 に答える 1

4

scala.actors.Actor の scaladocから:

注: Actor トレイトまたはそのコンパニオン オブジェクト ( など) によって提供されるメソッド以外のスレッド ブロック メソッドを呼び出す場合は注意が必要receiveです。アクター内の基になるスレッドをブロックすると、他のアクターが枯渇する可能性があります。receiveこれは、アクターが/を呼び出す間に長時間スレッドを占有する場合にも当てはまりますreact

アクターがブロック操作 (I/O をブロックするメソッドなど) を使用する場合、いくつかのオプションがあります。

  • ランタイム システムは、より大きなスレッド プール サイズを使用するように構成できます (たとえば、actors.corePoolSizeJVM プロパティを設定することにより)。
  • トレイトのschedulerメソッドをActorオーバーライドして を返すことができますResizableThreadPoolScheduler。これにより、スレッド プールのサイズが変更され、任意のブロッキング メソッドを呼び出すアクターによって引き起こされる枯渇を回避できます。
  • actors.enableForkJoinJVM プロパティは false に設定できます。その場合、アクターを実行するためにデフォルトで aがResizableThreadPoolScheduler使用されます。
于 2010-07-27T17:28:55.333 に答える