0

新しいAkkaI/ Oを使用してTcpサーバーを実装しようとしていますが、残念ながらドキュメントはまだ完成しておらず、Javaでの実装に問題があります:(。クライアントとサーバーを作成して送信することができましたクライアントからサーバーへのメッセージですが、ByteIteratorを使用して受信したバイトをどのように読み取りますか?間違ったアプローチを使用していますか?データの処理方法ではない可能性があります。

たくさんのメッセージを非常に速く送信すると、何か奇妙なことが起こります。それらはすべてbiにキューに入れられ、it.getInt()を作成した後、データがリセットされることはありません。

 if (msg instanceof Tcp.Received) {
   final Tcp.Received recv = (Tcp.Received) msg;
   final ByteString data = recv.data();
   ByteIterator bi = data.iterator();
   while(bi.hasNext()) {
      ....
   }
 } else if (msg instanceof Tcp.CommandFailed) {
   final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg;
   final Tcp.Command command = failed.cmd();
   // react to failed connect, bind, write, etc.
 } else if (msg instanceof Tcp.ConnectionClosed) {
   final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg;
   if (closed.isAborted()) {
      // handle close reasons like this
 }

解決策:ああ!今、私は自分の間違いを理解しました。クライアントにbyteStringBuilderをキャッシュし、それをクリアするのを忘れました-_-なんてばかげているのでしょう!

Javaの実装例が必要な場合は、次のようにします。

あなたたちがそれらを取っているimの最適化を持っているなら!:)

   int packetSize = 0;
   if (msg instanceof Tcp.Received) {
        final Tcp.Received recv = (Tcp.Received) msg;
        final ByteString data = recv.data();
        ByteIterator bi = data.iterator();
        while (bi.hasNext()) {
            packetSize = bi.getInt(ByteOrder.LITTLE_ENDIAN);
            Message m = Message.fromByteIterator(bi);
            getContext().parent().tell(m, null);
        }

    }

Message.fromByteIterator(bi)は、BIからInts、Floats、Bytes Arrays ...を取得することにより、新しいメッセージオブジェクトを初期化します。

4

1 に答える 1

0

問題を完全に理解しているわけではありませんが、ここで問題を引き起こした可能性のある2つの問題があります。

受け取るByteStringには、Tcpによって配信されたチャンクが含まれています。それに乗るイテレータは、そのチャンクでのみ反復します。ストリームではなく、チャンクです。また、Tcpはバイトのストリームを必要な場所で分割することを決定できるため、取得するチャンクがメッセージの境界と整列していない可能性があります。あなたは自分でフレーミングを処理する必要があります。

2番目の問題は、送信速度が速すぎてネットワークが追いつかない場合に発生する可能性があります。Akka IOには、そのためのフロー制御メカニズムが組み込まれています。最も単純なケースは、各書き込みにAckオブジェクトをアタッチし、前の書き込みがackされた場合にのみ次の書き込みを送信することです。

于 2013-03-25T16:32:50.307 に答える