2

私の問題に関する他のスレッドを閲覧した後、アプリケーションを再設計する必要があることを理解したと思います。TCP/IPしかし、明確にするために、クライアントとサーバーの間に単一の接続があります。クライアント側では、多数のスレッドが同時に実行されています。これらのスレッドのランダムに 1 つ以上が、TCP/IP接続を使用してサーバーと通信します。たとえば、実行時間の長いファイル転送がアクティブなときに、別のスレッドとの接続を同時に使用すると、エラーが発生する可能性があることがわかりました。各メッセージの前にデータ長を含む特定のヘッダーを付けましたが、IPつまり、1 つのメッセージはまだ完全に配信されていませんが、別のメッセージの一部が read メソッドに配信されます。TCP/IPこれは、意図した動作と一致する正しい観察ですか? 前もって感謝します - マリオ

++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++

興味のある方へ: 以下は、私のテスト プログラムのソース コードです。BUFFER_SIZE のさまざまな値と、同じソケットを使用する同時 TCP/IP 送信でサーバー ソケットを攻撃するために使用される THREADS の数で遊ぶことができます。一部のエラー処理を省略し、ソケットのクローズを含むより洗練された終了を削除しました。64KB を超える BUFFER_SIZE でテストすると、マシンで常にエラーが発生します。

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;

public class TCPTest
{
  private final static String INPUT_FILE   = "c:/temp/tcptest.in";
  private final static int    BUFFER_SIZE  = 64 * 1024 - 8; //65536;
  private final static int    MESSAGE_SIZE = 512 * 64 * 1024;
  private final static int    THREADS      = 3;
  private final static int    SIZE_OF_INT  = 4;
  private final static int    LENGTH_SIZE  = SIZE_OF_INT;
  private final static int    ID_SIZE      = SIZE_OF_INT;
  private final static int    HEADER_SIZE  = LENGTH_SIZE + ID_SIZE;
  private final static String NEW_LINE     = System.getProperty("line.separator");

  private ServerSocket m_serverSocket = null;
  private Socket       m_clientSocket = null;
  private int          m_iThreadCounter;

  public static void main(String[] args)  
  {
    new TCPTest();
  } // main

  public TCPTest() 
  {
    final String id = "ReaderThread[*]";
    // start a new thread creating a server socket waiting for connections 
    new Thread(new Runnable() 
    {
      public void run() 
      {
        try 
        {
          // create server socket and accept client requests
          m_serverSocket = new ServerSocket(12345);
          m_clientSocket = m_serverSocket.accept();
          // client request => prepare and read data
          long       startTime       = System.currentTimeMillis();
          byte[]     buffer          = new byte[BUFFER_SIZE];
          ByteBuffer header          = ByteBuffer.allocate(HEADER_SIZE);
          int        iTotalBytesRead = 0;
          boolean    fTerminate      = false;
          int        iBytesRead;
          // get hold of socket's input stream
          InputStream clientInputStream = m_clientSocket.getInputStream();
          // loop
          while (false == fTerminate)
          {
            // loop to read next header
            for (int i = 0; i < HEADER_SIZE; i++)
              clientInputStream.read(header.array(), i, 1);
            header.rewind();
            // get information of interest
            int iLength      = header.getInt();
            int iId          = header.getInt();
            int iLengthSoFar = 0;
            int iBytesLeft   = iLength;
            int iBytesToRead;
            // any length given?
            if ((0 < iLength) && (BUFFER_SIZE >= iLength))
            {
              // that's the case => read complete message
              while (iLengthSoFar < iLength)
              {
                // calculate number of bytes left
                iBytesLeft = iLength - iLengthSoFar;
                 // calculate maximum number of bytes to read
                if (iBytesLeft > BUFFER_SIZE)
                  iBytesToRead = BUFFER_SIZE;
                else
                  iBytesToRead = iBytesLeft;
                // read next portion of bytes
                if ((iBytesRead = clientInputStream.read(buffer, 0, iBytesToRead)) != -1)
                {
                  // maintain statistics
                  iTotalBytesRead += iBytesRead;
                  iLengthSoFar += iBytesRead;
                } // if
                else
                {
                  // finish => print message
                  System.out.println("==> "+id+": ERROR length=<-1> received " +
                        "for id=<"+iId+">");
                  fTerminate = true;
                  break;
                } // else
              } // while
            } // if
            else
            {
              System.out.println("==> "+id+": ERROR data length <= 0 for id=<"+iId+">");
              dump(header, 0, HEADER_SIZE / SIZE_OF_INT, "Error header");
            } // else
          } // while
          System.out.println("==> "+id+": "+ iTotalBytesRead + " bytes read in " 
                              + (System.currentTimeMillis() - startTime) + " ms.");
        } // try 
        catch (IOException e) 
        {
          e.printStackTrace();
        } // catch
      } // run
    }).start();
    // create the socket writer threads
    try
    {
      // ensure server is brought up and request a connection
      Thread.sleep(1000);
      System.out.println("==> "+id+": just awoke");
      Socket       socket             = new Socket("localhost", 12345);
      OutputStream socketOutputStream = socket.getOutputStream();
      System.out.println("==> "+id+": socket obtained");
      // create some writer threads
      for (int i = 0; i < THREADS; i++)
        // create a new socket writer and start the thread
        (new SocketWriter(socket, 
                          (i+1),
                          BUFFER_SIZE,
                          new String("WriterThread["+(i+1)+"]"),
                          socketOutputStream)).start();
    } // try
    catch (Exception e)
    {
      e.printStackTrace();
    } // catch
  } // TCPTestEx

  private final static void dump(ByteBuffer bb, int iOffset, int iInts, String header)
  {
    System.out.println(header);
    bb.rewind();
    for (int i = 0; i < iInts; i++)
      System.out.print(" " + Integer.toHexString(bb.getInt()).toUpperCase());
    System.out.print(NEW_LINE);
  } // dump

  private class SocketWriter extends Thread
  {
    Socket       m_socket;
    int          m_iId;
    int          m_iBufferSize;
    String       m_id;
    OutputStream m_os;

    protected SocketWriter(Socket socket, int iId, int iBufferSize, String id, OutputStream os)
    {
      m_socket       = socket;
      m_iId          = iId;
      m_iBufferSize  = iBufferSize;
      m_id           = id;
      m_os           = os;
      // increment thread counter
      synchronized (m_serverSocket)
      {
        m_iThreadCounter++;
      } // synchronized
    } // SocketWriter

    public final void run()
    {
      try 
      {
        long       startTime        = System.currentTimeMillis();
        ByteBuffer buffer           = ByteBuffer.allocate(m_iBufferSize + HEADER_SIZE); 
        int        iTotalBytesRead  = 0;
        int        iNextMessageSize = 512 * m_iBufferSize; 
        int        iBytesRead;
        // open input stream for file to read and send
        FileInputStream fileInputStream = new FileInputStream(INPUT_FILE);
        System.out.println("==> "+m_id+": file input stream obtained");
        // loop to read complete file
        while (-1 != (iBytesRead = fileInputStream.read(buffer.array(), HEADER_SIZE, m_iBufferSize))) 
        {
          // add length and id to buffer and write over TCP
          buffer.putInt(0, iBytesRead);
          buffer.putInt(LENGTH_SIZE, m_iId);
          m_os.write(buffer.array(), 0, HEADER_SIZE + iBytesRead);
          // maintain statistics and print message if so desired
          iTotalBytesRead += iBytesRead;
          if (iNextMessageSize <= iTotalBytesRead)
          {
            System.out.println("==> "+m_id+": <"+iTotalBytesRead+"> bytes processed");
            iNextMessageSize += MESSAGE_SIZE;
          } // if
        } // while
        // close my file input stream
        fileInputStream.close();
        System.out.println("==> "+m_id+": file input stream closed");
        System.out.println("==> "+m_id+": <"+ iTotalBytesRead + "> bytes written in " 
                            + (System.currentTimeMillis() - startTime) + " ms.");
        // decrement thread counter
        synchronized (m_serverSocket)
        {
          m_iThreadCounter--;
          // last thread?
          if (0 >= m_iThreadCounter)
            // that's the case => terminate
            System.exit(0);
        } // synchronized
      } // try 
      catch (Exception e) 
      {
        e.printStackTrace();
      } // catch
    } // run
  } // SocketWriter
} // TCPTest
4

1 に答える 1

0

ええ。TCP は、バイト指向のストリーム プロトコルです。これは、アプリケーションが (区切りなしの) バイト ストリームを受信することを意味します。「メッセージ」の概念は、アプリケーションによって提供される必要があります (または代わりにメッセージ指向プロトコルを使用します)。

于 2013-02-18T15:36:21.897 に答える