1

akka を使用するアプリケーションがあり、ソケット接続を介して接続したいと考えています。そのため、scala page のものと同様のメカニズムを使用します。tellしかし、開いている間にを試みてOutputStreamも、ターゲットはメッセージを受信しません。

ここに私のソースコードがあります:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))
    val master = system.actorOf(Props[TestActor])
    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
        new ConnectionThread(listener accept, master).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      system.shutdown
    }
  }
}

case class ConnectionThread(socket: Socket, master: ActorRef) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)

  master ! "b"

  override def run {
    master ! "c"
    try{
      master ! "d"
      val in = new ObjectInputStream(socket getInputStream)
      master ! "e"
      val out = new ObjectOutputStream(socket getOutputStream)

      out writeObject("listening")
      out flush

      master ! "f"
      val command = in.readObject.asInstanceOf[String]
      println("client sent: '" + command + "'")
      // process the command

      master ! "g"
      out.writeObject("EOF")
      out.flush

      out.close
      in.close
      socket.close
    } catch {
      case e: SocketException =>
      case e: IOException => e printStackTrace
    }
  }
}

class TestActor extends Actor with ActorLogging{

  log info("TestActor running")

  def receive = {
    case s: String =>
      log info("received: " + s)
  }

}

出力が得られます:

listening on port: 1337
[INFO] TestActor running
[INFO] received: a
[INFO] received: b
[INFO] received: c
[INFO] received: d

今、私はそれが g まで続くと思っていましたが、代わりに次のようになります:

client sent: 'select content from testdata on 2012-07-06'

ソケットのストリームを開くまで機能することがわかりました。おそらく、ソケットベースでもあり、ソケットの出力ストリームを使用しているためtellaskトレッドが実行されます。その後、ソケット接続は機能しますが、メッセージを送信できませんアクターシステムに。
Connector と ConnectionThread をドロップする方法はありません。どうすれば修正できますか?

4

1 に答える 1

0

ドキュメントの例を完全には理解していなかったことを認めなければなりません。しかし、作品ConnectionHelperに直接対処する代わりにa を使用すると、ActorRefかなりうまくいくことがわかりました。
コードを次のように変更しました。

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))

    //    val master = system.actorOf(Props[TestActor], "master")
    //    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
      //        new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start
        new ConnectionThread(listener accept, system).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      //      master ! PoisonPill
      system.shutdown
    }
  }

}

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)
  private val conHelper = new ConnectionHelper

  override def run {
    try {
      val out = new ObjectOutputStream(socket getOutputStream)
      val in = new ObjectInputStream(socket getInputStream)

      conHelper tell "funzt"
      out writeObject ("Hi")
      out.flush
      val command = in.readObject.asInstanceOf[String]
      println("received: " + command)
      out writeObject ("test")
      out.flush
      out writeObject ("EOF")
      out.flush

      out.close
      in.close
      socket.close
    }
  }

  private class ConnectionHelper {
    val tester = sys.actorOf(Props[TestActor])

    def tell(s: String) { tester ! s }

  }

}

なぜこれが機能するのか、私の質問のコードが機能しないのか、よくわかりません。すべての説明を歓迎します。

于 2012-07-12T11:21:10.877 に答える