4

このチュートリアルを使用して、書き込み可能なセクションなしで Java nio サーバーを構築しています。

1 つの興味深い点を除いて、すべて正常に動作します。

  • クライアントがパケットを送信する速度が速すぎる場合、サーバーはすべてのメッセージを受信しません。サーバーは常に最初と 2 番目のパケットを取得しますが、それ以上は取得しません。
  • クライアントがゆっくりとパケットを送信している場合、サーバーはすべてのパケットを取得します。

何か案が?

以下のコードで言及されている別のクラスが必要な場合は、サーバークラスコードを追加しています。私はここにいます:)。

NIOServer クラス:

package server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

import javax.xml.parsers.ParserConfigurationException;

import org.xml.sax.SAXException;

public class NioServer implements Runnable {



// The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  //the cach will hundle the messages that came
  private Cache cache;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException {
    this.cache = cache;
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }


  private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
      }

  private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        socketChannel.register(this.selector, SelectionKey.OP_READ);
      }

  private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
          numRead = socketChannel.read(this.readBuffer);
          String test = new String(this.readBuffer.array());
          System.out.print(test);

        } catch (IOException e) {
          // The remote forcibly closed the connection, cancel
          // the selection key and close the channel.
        //  key.cancel();
        //  socketChannel.close();
          return;
        }

        if (numRead == -1) {
          // Remote entity shut the socket down cleanly. Do the
          // same from our end and cancel the channel.
          key.channel().close();
          key.cancel();
          return;
        }

        // Hand the data off to our worker thread
        this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
      }

  public void run() {
        while (true) {
          try {
            // Wait for an event one of the registered channels

            this.selector.select();



            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
              SelectionKey key = (SelectionKey) selectedKeys.next();
              selectedKeys.remove();

              if (!key.isValid()) {
                continue;
              }

              // Check what event is available and deal with it
              if (key.isAcceptable()) {
                this.accept(key);
              } else if (key.isReadable()) {
                this.read(key);
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

  public static void main(String[] args) throws ParserConfigurationException, SAXException {
    try {
        Cache cache = new Cache();
        new Thread(cache).start();
      new Thread(new NioServer(null, 9090,cache)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
4

1 に答える 1

0

あなたがUDPを読んでいるなら、私はそれを期待します。メソッドでパケットを処理する速度に注意してくださいread。それらをsystem.outに出力していますが、これは非常に遅く、processDataメソッドの他のスレッドへのデータをどれだけ速く処理できるかわかりません。私が書いたこのライブラリは、遅延の原因である場合、スレッド間のノンブロッキング通信を行うのに役立ちます。基になる読み取りソケット バッファーのサイズも確認する必要があります。それが大きいほど、パケットがドロップされ始める前に迅速に追いつく必要がある余地が大きくなります。TCP の場合、基になるソケット バッファーがいっぱいになると、おそらくチャネルで IOException が発生します。UDP の場合、パケットはサイレントにドロップされます。

基になる読み取りソケット バッファー サイズにアクセスするには、次のようにします。

final Socket socket = channel.socket();
System.out.println(socket.getReceiveBufferSize());
socket.setReceiveBufferSize(newSize);

注:私の知る限り、Linuxでは、基礎となるバッファサイズを変更するためにOS構成が必要になる場合があります。効果がない場合setReceiveBufferSize(変更されたかどうかを確認するためにもう一度読んでください)、Google で調べてください。:)

于 2012-11-16T01:36:55.503 に答える