0

NIO を使用して、離れたマシンとの間でデータを送受信したいと考えています。いつでもデータを送受信できます。データを送信する必要がある場合は、離れたマシンからのクエリなしでデータを送信するだけで、離れたマシンから定期的にデータが送信されます。NIO の仕組みがわかりません。Selector SelectionKey でイベントを生成し、読み取りまたは書き込みを行うのは何ですか? 私の側で ServerSocketChannel を 1 つだけ使用して、離れたマシンからデータを読み取り、それにデータを書き込むことは可能ですか? それは私が理解していることですが、書き込みイベントをトリガーする方法がわかりません...説明していただきありがとうございます。

私はすでにいくつかのコーディングを行っており、離れたマシンからのデータを読み取ることはできますが、書き込むことはできません。Selector を使用していますが、データを書き込む方法がわかりません。ログに記録されたメッセージ「handle write」は書き込まれませんが、wireshark では自分のパケットを見ることができます。

    public class ServerSelector {

    private static final Logger logger = Logger.getLogger(ServerSelector.class.getName());
    private static final int TIMEOUT = 3000; // Wait timeout (milliseconds)
    private static final int MAXTRIES = 3;
    private final Selector selector;

    public ServerSelector(Controller controller, int... servPorts) throws IOException {
        if (servPorts.length <= 0) {
            throw new IllegalArgumentException("Parameter(s) : <Port>...");
        }
        Handler consolehHandler = new ConsoleHandler();
        consolehHandler.setLevel(Level.INFO);
        logger.addHandler(consolehHandler);

        // Create a selector to multiplex listening sockets and connections
        selector = Selector.open();

        // Create listening socket channel for each port and register selector
        for (int servPort : servPorts) {
            ServerSocketChannel listnChannel = ServerSocketChannel.open();
            listnChannel.socket().bind(new InetSocketAddress(servPort));

            listnChannel.configureBlocking(false); // must be nonblocking to register
            // Register selector with channel.  The returned key is ignored
            listnChannel.register(selector, SelectionKey.OP_ACCEPT);
        }

        // Create a handler that will implement the protocol
        IOProtocol protocol = new IOProtocol();

        int tries = 0;
        // Run forever, processing available I/O operations
        while (tries < MAXTRIES) {
            // Wait for some channel to be ready (or timeout)
            if (selector.select(TIMEOUT) == 0) { // returns # of ready chans
                System.out.println(".");
                tries += 1;
                continue;
            }

            // Get iterator on set of keys with I/O to process
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
            while (keyIter.hasNext()) {
                SelectionKey key = keyIter.next(); // Key is a bit mask
                // Server socket channel has pending connection requests?
                if (key.isAcceptable()) {
                    logger.log(Level.INFO, "handle accept");
                    protocol.handleAccept(key, controller);
                }

                // Client socket channel has pending data?
                if (key.isReadable()) {
                    logger.log(Level.INFO, "handle read");
                    protocol.handleRead(key);
                }

                // Client socket channel is available for writing and
                // key is valid (i.e., channel not closed) ?
                if (key.isValid() && key.isWritable()) {
                    logger.log(Level.INFO, "handle write");
                    protocol.handleWrite(key);
                }
                keyIter.remove(); // remove from set of selected keys
                tries = 0;
            }
        }
    }
}

プロトコル

    public class IOProtocol implements Protocol {

    private static final Logger logger = Logger.getLogger(IOProtocol.class.getName());

    IOProtocol() {
        Handler consolehHandler = new ConsoleHandler();
        consolehHandler.setLevel(Level.INFO);
        logger.addHandler(consolehHandler);
    }

    /**
     *
     * @param key
     * @throws IOException
     */
    @Override
    public void handleAccept(SelectionKey key, Controller controller) throws IOException {
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
        clntChan.configureBlocking(false); // Must be nonblocking to register
        controller.setCommChannel(clntChan);
        // Register the selector with new channel for read and attach byte buffer
        SelectionKey socketKey = clntChan.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE, controller);
    }

    /**
     * Client socket channel has pending data
     *
     * @param key
     * @throws IOException
     */
    @Override
    public void handleRead(SelectionKey key) throws IOException {
        Controller ctrller = (Controller)key.attachment();
        try {
            ctrller.readData();
        } catch (CommandUnknownException ex) {
            logger.log(Level.SEVERE, null, ex);
        }
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    /**
     * Channel is available for writing, and key is valid (i.e., client channel
     * not closed).
     *
     * @param key
     * @throws IOException
     */
    @Override
    public void handleWrite(SelectionKey key) throws IOException {

        Controller ctrl = (Controller)key.attachment();
        ctrl.writePendingData();
        if (!buf.hasRemaining()) { // Buffer completely written ?
            // Nothing left, so no longer interested in writes
            key.interestOps(SelectionKey.OP_READ);
        }
    buf.compact();
    }
}

コントローラー

    /**
     * Fill buffer with data.
     * @param msg The data to be sent
     * @throws IOException 
     */
    private void writeData(AbstractMsg msg) throws IOException {
//        
        writeBuffer = ByteBuffer.allocate(msg.getSize() + 4);
        writeBuffer.putInt(msg.getSize());
        msg.writeHeader(writeBuffer);
        msg.writeData(writeBuffer);
        logger.log(Level.INFO, "Write data - message size : {0}", new Object[]{msg.getSize()});
        logger.log(Level.INFO, "Write data - message : {0}", new Object[]{msg});
    }

    /**
     * Write to the SocketChannel
     * @throws IOException 
     */
    public void writePendingData() throws IOException {
        commChannel.write(writeBuffer);
    }
4

3 に答える 3

1

ServerSocketChannel接続には使用されますが、データの送信には使用されません。接続ごとServerSocketChannelに 1 つと1 つ必要です。SocketChannel

を使用した読み書きの例SocketChannel:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

プログラムは、データが来るまで 2 行目でスリープします。このコードを無限ループに入れて、バックグラウンドで実行する必要がありますThread。データが到着したら、このスレッドからそれを処理し、別のデータが来るのを待ちます。

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put("Hello!".getBytes());

buf.flip();

while(buf.hasRemaining()) {
    channel.write(buf);
}

ブロッキング メソッドがないため、小さなバイト バッファを送信する場合は、 main からこれを呼び出すことができますThread

ソース

追加:OP_WRITE新しい接続でキーを 設定しないでください。のみOP_READ。データを書きたいときは、何かを送信したいことをセレクターに通知し、イベントループで送信する必要があります。良い解決策は、Queue送信メッセージを作成することです。次に、次の手順に従います。

  • データの追加Queue
  • OP_WRITEチャンネルのキーに設定
  • while (keyIter.hasNext())ループ内でwritable key、キューからすべてのデータを書き込み、OP_WRITEキーを削除します。

あなたのコードを理解するのは私には難しいですが、何が問題なのかが分かると思います。また、接続を 1 つだけにしたい場合は、 を使用する必要はありませんSelector。そして、これはあなたが少数を束縛するのは奇妙ですServerSocketChannels

于 2012-07-03T11:41:10.787 に答える
0

ブロッキングNIO(SocketChannelのデフォルトの動作)を使用することをお勧めします。セレクターを使用する必要はありませんが、1つのスレッドを読み取りに使用し、別のスレッドを書き込みに使用できます。


あなたの例に基づいています。

private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024*1024);

private void writeData(AbstractMsg msg) {
    writeBuffer.clear();
    writeBuffer.putInt(0); // set later
    msg.writeHeader(writeBuffer);
    msg.writeData(writeBuffer);
    writeBuffer.putInt(0, writeBuffer.position());

    writeBuffer.flip();
    while(writeBuffer.hasRemaining())
        commChannel.write(writeBuffer);
}
于 2012-07-03T11:29:53.720 に答える
0

Selector SelectionKey でイベントを生成し、読み取りまたは書き込みを行うのは何ですか?

OP_READ: ソケット受信バッファ内のデータまたは EOS の存在。

OP_WRITE: ソケット送信バッファの空き容量。

于 2012-07-03T12:56:51.667 に答える