1

TcpMessage.abort()Akka TCP IO で奇妙な動作があります。問題は、ハンドラーまたはハンドラーからの明示的な呼び出しによる接続のリセットTerminatingです。ピアがイベントを受信しませんでしTcp.ConnectionClosedた。例:

ハンドラー onReceive

@Override
public void onReceive(Object msg) throws Exception {
    if (msg instanceof Tcp.ConnectionClosed) {
        log.info("Server ConnectionClosed: {}", msg);
        getContext().stop(getSelf());
    } else if (msg instanceof Tcp.Received){
        log.info("Aborting connection");
        getSender().tell(TcpMessage.abort(), getSelf());
    }
}

テストコード

new JavaTestKit(system) {{
     getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
     Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

     ActorRef client = getSystem().actorOf(
     Props.create(Client.class, bound.localAddress(), getRef()), "Client");
     watch(client);
     expectMsgClass(Tcp.Connected.class);
     client.tell(new Message.Write(), getRef());

     expectTerminated(client);
}};

実行後のログ

[INFO] [10/19/2013 22:13:22.730] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:13:22.736] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:13:22.767] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:13:22.774] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted

java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: Terminated Actor[akka://actorSystem/user/Client#423174850]

ハンドラーに変更TcpMessage.abort()した場合:TcpMessage.close()

[INFO] [10/19/2013 22:17:06.243] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/19/2013 22:17:06.249] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/19/2013 22:17:06.278] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client] Client ConnectionClosed: PeerClosed
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Closed

サーバーハンドラーはどの接続がエラーで閉じられているかをリモートクライアントにどのように通知できますか? クライアントからサーバーに中止を送信すると、すべて正常に動作します。

編集:

OK、問題の根本を発見しました。中断しようとする前に書き込みコマンドを送信した場合にのみ、中断コマンドが機能しません。クライアントとサーバーの両方で。今、私は理由を理解する必要があります。

EDIT2:

ErrorClosed新しいクライアントを作成すると、最初のクライアントに到達し、同じ書き込みコマンドを2番目のクライアントに送信すると、クライアントが期待どおりに受信した中止を実行するため、中止メッセージがスタックしているようです。

new JavaTestKit(system) {{
    getSystem().actorOf(Props.create(Server.class, getRef()), "Server");
    Tcp.Bound bound = expectMsgClass(Tcp.Bound.class);

    ActorRef client = getSystem().actorOf(
            Props.create(Client.class, bound.localAddress(), getRef()), "Client");
    watch(client);
    expectMsgClass(Tcp.Connected.class);
    client.tell(new Message.Write(), getRef());
    expectNoMsg();

    ActorRef client2 = getSystem().actorOf(
           Props.create(Client.class, bound.localAddress(), getRef()), "Client2");
    watch(client2);
    expectMsgClass(Tcp.Connected.class);

    expectTerminated(client);

    client2.tell(new Message.Write(), getRef());

    expectTerminated(client2);
    expectNoMsg();
}};

そしてログ:

[INFO] [10/20/2013 00:48:05.923] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected
[INFO] [10/20/2013 00:48:05.929] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler
[INFO] [10/20/2013 00:48:05.959] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/2#1767183113]]
[INFO] [10/20/2013 00:48:05.966] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.930] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client2] Client Connected
[INFO] [10/20/2013 00:48:08.934] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)
[INFO] [10/20/2013 00:48:08.936] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client2] Sending Write to Handler
[INFO] [10/20/2013 00:48:08.937] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/4#-598138943]]
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Server ConnectionClosed: Aborted
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-7] [akka://actorSystem/user/Client2] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host)

EDIT3:

もう少し明確にするために、質問は次のとおりです。リモート アクター ハンドラーに、クラッシュしたピアとErrorClosed接続 (リモート エンドで実行している) を通知するにはどうすればよいですか。最後のErrorClosedメッセージは常にスタックしており、消えませんでした。セレクターのどこかに留まり、別のネットワーク操作 (別のアクターをサーバーに書き込むか接続する) を行った後にのみピアにプッシュされます。Writeこの動作は、中止する前にコマンドを実行した場合にのみ発生します。この動作は OS に依存しません。Windows と Linux マシンで試してみましたが、同じ結果でした。

4

1 に答える 1

1

この問題の確実な解決策は見つかりませんでしたが、全体的な影響を最小限に抑えるためにいくつかのことを行いました.

  1. 明示的に RST を使用しないでください (これには、abort コマンドが含まれます)。常に FIN との接続を閉じます (クローズ)。エラーの場合でも。
  2. エラーが原因でアクターが死亡した場合は、接続を閉じる信号を送信するため、接続を適切に閉じることができます。
  3. 大量のデータを転送するには、async rep-req データ転送メカニズムを使用します (ZMQ)。

また、ハートビート アクターを実装することもできますが、特に Akka IO セレクターが TCP ソケットでハートビートを単独で実行するのは、あまり良い考えではないように思えます。

于 2013-10-23T00:14:28.130 に答える