0

Java のネットワーク API (古い io と nio と nio2) をテストするプログラムを作成しています。

2 つの値のみを送信するサーバーがあります。

  1. System.nanoTime()
  2. 送信されたメッセージの数をカウントするカウンター。

クライアントはこのデータを受け取り、リモートの System.nanoTime() をローカルのタイムスタンプと比較してレイテンシを計算し、カウンターをチェックしてデータがドロップされていないことを確認します。

これは単なるテストであるため、サーバーとクライアントは同じ JVM で実行されています。90% の確率でデータが正しく転送されます。ただし、タイムスタンプが完全に間違っている場合があります。オーバー/アンダーフロー エラーの可能性があるようですが、どのように導入されるのかわかりません。エラーの例を次に示します。

エラー: カウンター 3、remoteTS -8267580102784516096、localTS 155321716184402、差分 8267735424500700498

ローカル タイムスタンプ 155321716184402 は、午後 7 時過ぎに変換されることに注意してください。しかし、リモートのタイムスタンプは単に負です! コードを見ると、凝った日付の計算は行っていません。負になることはありません。また、オーバーフロー エラーが発生する方法もわかりません。大きいエンディアンと小さいエンディアンが原因ではないかと思いましたが、一部だけでなく、すべての値が間違っている可能性があります。

コード (少し大きなテストから抽出されたもの) は次のとおりです。

package networkioshootout;

import static java.lang.System.out;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutionException;


public class DebugNetwork {
  private final static int SENDCOUNT = 100;
    private final static int PORT = 9000;
    private final static int TESTLOOP = 10;
    private final static Random rn = new Random();

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        long currentNanos = System.nanoTime();
        long currentMillis = System.currentTimeMillis();
        Date now = new Date();
        System.out.println(String.format("Current date/time:%s, nanos:%s, millis:%s",
                now, currentNanos, currentMillis));

        //Server
        new Server().start();

        //Client
        for(int i=0; i< TESTLOOP; i++){
            final int DATASIZE = (1+rn.nextInt(99))*8;
            clientInputstream(DATASIZE);
        }
    }

    private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException {
        final byte[] internalBuffer = new byte[bufferSize+16] ;
        final ByteBuffer longExtractor = ByteBuffer.allocate(16);

        int bytesReadSoFar = 0;
        long counter = 0;

        Socket client = new Socket(InetAddress.getLocalHost(), PORT);
        InputStream in = client.getInputStream();

        byte[] data = new byte[bufferSize];
        int size = 0;

        try{
            while(-1 != (size = in.read(data))){
                for(int i=0; i < size; i++){
                    internalBuffer[i+bytesReadSoFar] = data[i];
                }
                bytesReadSoFar += size;

                if(bytesReadSoFar >= 16){
                    int values = bytesReadSoFar/16;
                    int toRead = values;
                    int remainder = bytesReadSoFar % 16;

                    for(int i=0; i< toRead; i++){
                        int j = i * 16;

                        //long remoteTS = ByteBuffer.wrap(new byte[]{internalBuffer[j+0],internalBuffer[j+1],internalBuffer[j+2],internalBuffer[j+3],internalBuffer[j+4],internalBuffer[j+5],internalBuffer[j+6],internalBuffer[j+7]}).getLong();
                        //long remoteCounter = ByteBuffer.wrap(new byte[]{internalBuffer[j+8],internalBuffer[j+9],internalBuffer[j+10],internalBuffer[j+11],internalBuffer[j+12],internalBuffer[j+13],internalBuffer[j+14],internalBuffer[j+15]}).getLong();

                        //long remoteTS = data[0] | ((int)(data[1]) << 4) | ((int)(data[1]) << 8) | ((int)(data[1]) << 12) | ((int)(data[1]) << 16) | ((int)(data[1]) << 20) | ((int)(data[1]) << 24) ;

                        longExtractor.put(internalBuffer, j, 16);
                        longExtractor.flip();
                        long remoteTS = longExtractor.getLong();
                        long remoteCounter = longExtractor.getLong();
                        longExtractor.clear();

                        if(remoteCounter != counter){
                            String error = "ERROR: Expected remote counter to be "+counter+" but it was actually "+remoteCounter;
                            //System.out.println(error);
                            throw new RuntimeException(error);
                        }
                        counter++;

                        long localTS = System.nanoTime();
                        long latency = localTS - remoteTS;
                        if(Math.abs(latency) > 1200000000) {
                            out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s",
                                    counter, remoteTS, localTS, latency));
                            continue;
                        }


                    }

                    //System.arraycopy(data, toRead, data, 0, remainder);
                    for(int i=0; i < remainder; i++){
                        internalBuffer[i] = internalBuffer[i+toRead];
                    }
                    bytesReadSoFar = remainder;
                }
            }
        }
        finally{
            client.close();
        }
    }

    static final class Server extends Thread{

        public void run(){
            try {
                startServer();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private static void startServer() throws IOException {
            final ServerSocket server = new ServerSocket(PORT);

            //System.out.println("Server listening on port "+PORT);

            while(true){
                final Socket c1 = server.accept();
                c1.setTcpNoDelay(true);
                //System.out.println("Client connected");
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        long totalMsgs = 0;
                        long counter = 0;
                        DataOutputStream serverout;
                        try {
                            serverout = new DataOutputStream(c1.getOutputStream());
                            for(int i=0;i<SENDCOUNT;i++){ 
                                serverout.writeLong(System.nanoTime());
                                serverout.writeLong(counter);
                                totalMsgs++;
                                counter++;
                            }
                            //System.out.println("Sent bytes to client: "+total);
                        } catch (IOException e) {
                            out.println("Messages sent:"+totalMsgs+", current counter:"+counter);
                            e.printStackTrace();
                        }
                        finally{
                            //System.out.println("Client disconnected when counter was "+counter);
                            try { c1.close(); } catch (IOException e) { e.printStackTrace();}
                        }
                    }
                }).start();
            }
        }
    }

}

編集:これについていくつかのコメントがあったため、実際のプログラムには、入力ストリーム、バッファリングされたストリーム、NIO、NIO2 を介してサーバーに接続するクライアントがあります。これはプログラムのより完全な (ただし古い) バージョンです: https://gist.github.com/falconair/4975243

データ入力ストリームの追加、ソケット オプションの実験などはまだ行っていません。先に進む前に、データ破損の問題を解決したいと考えています。

4

2 に答える 2

1

エラーは、データシャッフルの使用data[]と関係があります。internalBuffer[]このコードのようなものを使用して実際のクライアントが作成されているとは思いません。正気な人なら誰でもBufferedInputStream.

異なるバッファ サイズの効果をテストする場合は、 and を使用しnew DataInputStream(new BufferedInputStream(socket.getInputStream(), bufferSize))てandを完全に削除してください。これらは無関係な問題を引き起こしているだけです。readLong()datainternalBufferlongExtractor

以下は問題なく動作します。

private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException
{
    long counter = 0;

    Socket client = new Socket(InetAddress.getLocalHost(), PORT);
    DataInputStream in = new DataInputStream(new BufferedInputStream(client.getInputStream(), bufferSize));

    try
    {
        for (;;)
        {
            long remoteTS = in.readLong();
            long remoteCounter = in.readLong();
            if (remoteCounter != counter)
            {
                String error = "ERROR: Expected remote counter to be " + counter + " but it was actually " + remoteCounter;
                //System.out.println(error);
                throw new RuntimeException(error);
            }
            counter++;

            long localTS = System.nanoTime();
            long latency = localTS - remoteTS;
            if (Math.abs(latency) > 1200000000)
            {
                out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s",
                    counter, remoteTS, localTS, latency));
                continue;
            }
        }
    }
    catch (EOFException exc)
    {
        System.out.println("EOS");
    }
    finally
    {
        client.close();
    }
}
于 2013-02-20T23:07:28.943 に答える
0

によって返される値はSystem.nanoTime()、実行中の JVM に固有です。代わりに System.currentTimeMillis() を使用する必要があります。

このメソッドは、経過時間を測定するためにのみ使用でき、システム時間またはウォールクロック時間の他の概念とは関係ありません。返される値は、固定されているが任意の起点時刻からのナノ秒を表します (おそらく将来であるため、値は負の可能性があります)。Java 仮想マシンのインスタンスでは、このメソッドのすべての呼び出しで同じオリジンが使用されます。他の仮想マシン インスタンスは別のオリジンを使用する可能性があります。

編集:

同じ JVM でテストを実行しているため、(この) エラーの原因は上記のものとは異なるはずです (ただし、値が異なる JVM 間で比較できるように、「currentTimeMillis」の使用を検討する必要があります)。

BufferedInputStream を使用してストリームをバッファリングし、一度に N (16?) バイトのチャンクを読み取って処理することをお勧めします。

 Socket client = new Socket(InetAddress.getLocalHost(), PORT);
 InputStream in = new BufferedInputStream(client.getInputStream());

 int length = 16, offset=0;     
 while (length>0) {
   int read = in.read(data,offset,length);
   if (read<0) ... //connection error
   offset+=read;
   length-=read;
 }
于 2013-02-20T20:51:53.257 に答える