1

このチュートリアルを読んだ後: http://rox-xmlrpc.sourceforge.net/niotut/ (これはノンブロッキング サーバーとクライアントの記述に関するものであり、NIO 部分を読み、SSL 部分をスキップしました)、今は自分自身を書き直そうとしています。クライアントですが、クライアント コードを編集しようとすると問題が発生します。

最初に、チュートリアルのクライアント コードをご覧いただきたいと思います。これには 2 つのファイルが含まれています。

mainしかし、私は以下のように私の問題を説明するために機能で NIOClient.java を少し編集しました:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class NIOClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;

// The selector we'll be monitoring
private Selector selector;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

// A list of PendingChange instances
private List pendingChanges = new LinkedList();

// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();

// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());

public NIOClient(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
}

public void send(byte[] data, RspHandler handler) throws IOException {
    // Start a new connection
    SocketChannel socket = this.initiateConnection();

    // Register the response handler
    this.rspHandlers.put(socket, handler);

    // And queue the data we want written
    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
            queue = new ArrayList();
            this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
}

public void run() {
    while (true) {
        try {
            // Process any pending changes
            synchronized (this.pendingChanges) {
                Iterator changes = this.pendingChanges.iterator();
                while (changes.hasNext()) {
                    ChangeRequest change = (ChangeRequest) changes.next();
                    switch (change.type) {
                    case ChangeRequest.CHANGEOPS:
                        SelectionKey key = change.socket.keyFor(this.selector);
                        key.interestOps(change.ops);
                        break;
                    case ChangeRequest.REGISTER:
                        change.socket.register(this.selector, change.ops);
                        break;
                    }
                }
                this.pendingChanges.clear();
            }

            // Wait for an event one of the registered channels
            this.selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isConnectable()) {
                    this.finishConnection(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}

private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
    System.arraycopy(data, 0, rspData, 0, numRead);

    // Look up the handler for this channel
    RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);

    // And pass the response to it
    if (handler.handleResponse(rspData)) {
        // The handler has seen enough, close the connection
        socketChannel.close();
        socketChannel.keyFor(this.selector).cancel();
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            // We wrote away all data, so we're no longer interested
            // in writing on this socket. Switch back to waiting for
            // data.
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

private void finishConnection(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
        socketChannel.finishConnect();
    } catch (IOException e) {
        // Cancel the channel's registration with our selector
        System.out.println(e);
        key.cancel();
        return;
    }

    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
}

private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
        this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }

    return socketChannel;
}

private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
}

public static void main(String[] args) {
    try {
        NIOClient client = new NIOClient(
                InetAddress.getByName("127.0.0.1"), 9090);
        Thread t = new Thread(client);
        t.setDaemon(true);
        t.start();

        // 1st
        client.send("hehe|||".getBytes());
        System.out.println("SEND: " + "hehe|||");
        handler.waitForResponse();

        System.out.println("------------");

        // 2nd
        client.send(("hehe|||" + " 2").getBytes());
        System.out.println("SEND: " + "hehe|||" + " 2");
        handler.waitForResponse();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

私の編集したクライアントは、サーバーにメッセージを送信し、サーバーからエコーされたメッセージを受信するという単純なことを行うだけです。もちろん、上記のコードはうまく機能します。2 つのメッセージを送信し、それらを正しく受信しました。

しかし、上記のクライアントで私が望んでいないことは、次のとおりです。send関数は次のコードを呼び出します。

    // Start a new connection
    SocketChannel socket = this.initiateConnection();

これは、すべての特徴的なメッセージがすべての特徴的な新しい SocketChannel に対応することを意味しますが、多くのメッセージを送信するために 1 つの SocketChannel のみを使用したいので、次のコードのようにクライアントを変更します。

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class MyClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;

// The selector we'll be monitoring
private Selector selector;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(8);

// A list of PendingChange instances
private List pendingChanges = new LinkedList();

// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();

// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());


private SocketChannel socket;
private static MyResponseHandler handler;

public MyClient(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();

    // Start a new connection
    socket = this.initiateConnection();

    handler = new MyResponseHandler();
    // Register the response handler
    this.rspHandlers.put(socket, handler);      
}

public void send(byte[] data) throws IOException {

    // And queue the data we want written
    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
            queue = new ArrayList();
            this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
}

public void run() {
    while (true) {
        try {
            // Process any pending changes
            synchronized (this.pendingChanges) {
                Iterator changes = this.pendingChanges.iterator();
                while (changes.hasNext()) {
                    ChangeRequest change = (ChangeRequest) changes.next();
                    switch (change.type) {
                    case ChangeRequest.CHANGEOPS:
                        SelectionKey key = change.socket.keyFor(this.selector);
                        key.interestOps(change.ops);
                        break;
                    case ChangeRequest.REGISTER:
                        change.socket.register(this.selector, change.ops);
                        break;
                    }
                }
                this.pendingChanges.clear();
            }

            // Wait for an event one of the registered channels
            this.selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isConnectable()) {
                    this.finishConnection(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
}

private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
    System.arraycopy(data, 0, rspData, 0, numRead);

    // Look up the handler for this channel
    MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);

    // And pass the response to it
    if (handler.handleResponse(rspData)) {
        // The handler has seen enough, close the connection
        socketChannel.close();
        socketChannel.keyFor(this.selector).cancel();
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.remove(0);
            socketChannel.write(buf);

            //-- DEBUG --
            System.out.println("===>>> socketChannel.write: " + new String(buf.array()));

            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
        }

        if (queue.isEmpty()) {
            // We wrote away all data, so we're no longer interested
            // in writing on this socket. Switch back to waiting for
            // data.
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

private void finishConnection(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
        socketChannel.finishConnect();
    } catch (IOException e) {
        // Cancel the channel's registration with our selector
        System.out.println(e);
        key.cancel();
        return;
    }

    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
}

private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
        this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }

    return socketChannel;
}

private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
}

public static void main(String[] args) {
    try {
        MyClient client = new MyClient(
                InetAddress.getByName("127.0.0.1"), 9090);
        Thread t = new Thread(client);
        t.setDaemon(true);
        t.start();

        // 1st
        client.send("hehe|||".getBytes());
        System.out.println("SEND: " + "hehe|||");
        handler.waitForResponse();

        System.out.println("------------");

        // 2nd
        client.send(("hehe|||" + " 2").getBytes());
        System.out.println("SEND: " + "hehe|||" + " 2");
        handler.waitForResponse();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

しかし、上記のクライアントを実行した後、最初のメッセージのみが送受信され、デバッグ後に 2 番目のメッセージが送信されていないことがわかりますが、その理由と解決方法はわかりません。

アンサー知ってる人いますか?

私の非常に長い質問を読んでくれてありがとう。

4

1 に答える 1

5

あなたは間違った場所から始めました。その記事には多くの問題があります。保留中の変更キューのものはすべて、計り知れない不必要な複雑さです。別のスレッドから登録/登録解除する必要がある場合はセレクターだけwakeup()です (ただし、なぜそれを行う必要があるのか​​ は、私には完全な謎です) interestOps。これまでに実現したさまざまな実装について広まっています。

この記事には他にもいくつかの問題があり、著者が何について話しているのか本当にわかっていないことがわかります。AnIOExceptionは必ずしも「リモートが強制的に接続を閉じた」という意味ではありません。彼のfinishConnection()メソッドは戻り値を無視します。これが false の場合、接続がまだ保留中であることを意味するため、OP_CONNECT段階を超えてチャンネルを登録するのが早すぎます。cancel()チャネルを閉じるとキーがキャンセルされるため、呼び出しの直前または直後のすべてのclose()呼び出しは冗長であり、削除できます (ただし、閉じずにキャンセルする場所がありますが、これは発生する場所も間違っています)。

さらに遠く:

紹介した 2 つのメソッドのどこにも、ソケット チャネルの選択キーに OP_CONNECT フラグを設定するように要求していません。これを行うと、OP_CONNECT フラグが上書きされ、接続が完了しなくなります。そして、それらを組み合わせると、接続されていないチャネルに書き込もうとするリスクが発生します (または、少なくともそのケースに対処する必要があります)」

これは完全にグレード A のナンセンスです。OP_CONNECT を 2 回設定するか、「それらを組み合わせて」、それが何を意味するにせよ、「接続を完了しない」または「接続されていないチャネルに書き込もうとする」可能性はありません。彼は、少し 2 回設定するとクリアになると考えているようです。

データはすでにキューに入れられています (または、そもそも接続を確立していません)。

奇妙で説明のつかない仮定。

そのかなり疑わしい混乱の代わりに、Oracleフォーラムの移行後もまだ見つけることができれば、彼が引用している「Taming the NIO Circus」スレッドをよく見てみたい. 免責事項:私はそれの一部を書きました。

于 2012-03-28T23:16:40.780 に答える