6

Scala と Akka を使用して単純な P2P をモデル化しています。

class Node() extends Peer with Actor {

  var peers: List[ActorRef] = List()

  def receive = {
    case _register(peer: ActorRef, p: Option[Int]) => {
      println("registering [" + peer + "] for [" + this + "]")
      peers = peer :: peers
    }
  }

}

sealed case class _register(val peer: ActorRef, var p: Option[Int] = None)

次に、単純なネットワーク:

class Network() extends Actor {

  def this(name: String) = {

    this()

    val system = ActorSystem(name)

    val s1 = system.actorOf(Props(new Node()), name = "s1")
    val s2 = system.actorOf(Props(new Node()), name = "s2")

    val c1 = system.actorOf(Props(new Node()), name = "c1")
    val c2 = system.actorOf(Props(new Node()), name = "c2")
    val c3 = system.actorOf(Props(new Node()), name = "c3")
    val c4 = system.actorOf(Props(new Node()), name = "c4")

    implicit val timeout = Timeout(5 second)

    s1 ? _register(c1)
    s1 ? _register(c2)
    s1 ? _register(c3)
    val lastRegistered = s2 ? _register(c4)
    Await.ready(lastRegistered, timeout.duration)

    println("initialized nodes")
  }
}

私が得ている出力は常に次のようになります。

registering [Actor[akka://p2p/user/c1]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c2]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c3]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c4]] for [nl.cwi.crisp.examples.p2p.scala.Node@13c0b53]
[ERROR] [04/10/2012 22:07:04.34] [main-akka.actor.default-dispatcher-1] [akka://main/user/p2p] error while creating actor
java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds
    at akka.dispatch.DefaultPromise.ready(Future.scala:834)
    at akka.dispatch.DefaultPromise.ready(Future.scala:811)
    at akka.dispatch.Await$.ready(Future.scala:64)
    at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at akka.actor.ActorCell.newActor(ActorCell.scala:488)
    at akka.actor.ActorCell.create$1(ActorCell.scala:506)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191)
    at akka.dispatch.Mailbox.run(Mailbox.scala:160)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997)
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495)
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

Futures on Akka リファレンス ドキュメントのドキュメントに従いました。と交換Await.readyAwait.resultても効果はありません。ログは、最後の登録が成功したことを示しています。

これを修正するにはどうすればよいですか?

4

2 に答える 2

8

ノード アクターからメッセージが返されるのを待っていますが、ノード アクターは、senderactorRef にメッセージを送り返しませんs1 ? _registersender ! somethingNode メソッド内から追加して応答を送信できますが、この場合何が意味があるのreceiveか​​わかりません。something

于 2012-04-10T20:38:43.110 に答える
8

シチューは正しく理解しましたが、ネットワーク アクターに気になるコードがあります。

val system = ActorSystem(name)

val s1 = system.actorOf(Props(new Node()), name = "s1")
val s2 = system.actorOf(Props(new Node()), name = "s2")

val c1 = system.actorOf(Props(new Node()), name = "c1")
val c2 = system.actorOf(Props(new Node()), name = "c2")
val c3 = system.actorOf(Props(new Node()), name = "c3")
val c4 = system.actorOf(Props(new Node()), name = "c4")

新しい ActorSystem を作成する理由と、そのアクター システム内にトップレベルのアクターを作成する理由は何ですか?

アクターのシステムにアクセスする必要がある場合は、次のように呼び出します。

context.system

また、すべてのファイルをファイル システムのルートに配置してファイル システムのルートを混乱させてはならないのと同じ理由で、トップ レベルのアクターを作成することは避けるべきです。Network に子アクターを作成するには、次のようにします。

context.actorOf(...)

現在、同じシステムで複数のネットワーク アクターを作成するとすぐに問題が発生します。これは、同じ名前のトップレベル アクターを作成しようとするためです。

于 2012-04-11T08:27:04.983 に答える