2

私はNIO2サーバーを書いています.AsynchronousSocketChannelで非同期読み取り操作を行う必要があります。これらの操作はそれぞれ、整数の読み取りと、同じチャネルからのこの整数に等しいバイト数の読み取りで構成されています。問題は、チャネルに 2 つ以上の CompletionHandler を続けて配置し (複数の読み取り操作の要求があるため)、これらのハンドラーの最初が起動さcomplete()れると、最初のハンドラーのメソッドでさらに読み取りコードが正しく機能しないことです。チャンネルに情報があると、ハンドラーは即座に起動されます。さらなる読み取りが問題complete()なく完了するまで、チャネルをブロックするにはどうすればよいFutureですか? ハンドラーをソケットに配置してから他のタスクに渡す必要があるため、Future を使用できません。

for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) {
        try {
            final AsynchronousSocketChannel outSocket = entry.getValue();
            synchronized (outSocket) {
                final ByteBuffer buf = ByteBuffer.allocateDirect(9);
                outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey()));
            }

        } catch (Exception e) {

        }
    }

DataServerResponseHandler クラスは次のとおりです。

class DataServerResponseHandler implements CompletionHandler<Integer, Void> {

    private ConcurrentHashMap<String, Boolean> resultTable = null;
    private AsynchronousSocketChannel channel = null;
    private TcpServer server;
    private String ip;
    private ByteBuffer msg;

    public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel,
            ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) {
        this.msg = msg;
        this.channel = channel;
        this.resultTable = resultTable;
        this.server = server;
        this.ip = ip;
    }

    @Override
    public void completed(Integer result, Void attachment) {
            try {
                msg.rewind();
                int resultCode = msg.get() & 0xff;
                int ipOne = msg.get() & 0xff;
                int ipTwo = msg.get() & 0xff;
                int ipThree = msg.get() & 0xff;
                int ipFour = msg.get() & 0xff;
                int length = msg.getInt();
                msg.rewind();
                ByteBuffer buf = ByteBuffer.allocateDirect(length);
                channel.read(buf).get();
                buf.rewind();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    @Override
    public void failed(Throwable exc, Void attachment) {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

}
4

1 に答える 1

1

このコードにはいくつかの問題があります。
最初の読み取りは、残りのすべてのバイトを読み取ることを保証するものではありませんが、少なくとも 1 バイトを読み取るとすぐに完了ハンドラーを呼び出します。したがって、ヘッダーの 9 バイトまたはペイロードの長さになるまで、バッファーの位置を確認し、読み取りを再度呼び出す必要があります。

if (msg.position() < 9) {
    channel.read(msg, null, this);
    return;
}

2 番目の部分では、非同期アプローチを続行するために、完了ハンドラーを使用して読み取りを再度実行します。ペイロードを具体的に処理する新しいものを作成するか、既存のものを再利用できます。状態を覚えておく必要があります。

switch (state) {
case READ_HEADER:
    if (msg.remaining() > 0) {
        channel.read(msg, null, this);
        return;
    }
    // do the parsing the IP and length
    state = READ_PAYLOAD;
    channel.read(payloadBuf, null, this);
    break;

case READ_PAYLOAD:
    if (payloadBuf.remaining() > 0) {
        channel.read(payloadBuf, null, this);
        return;
    }
    payloadBuf.flip();
    // get content from payloadBuf
    break;
}
于 2015-10-09T13:57:57.100 に答える