私の問題に関する他のスレッドを閲覧した後、アプリケーションを再設計する必要があることを理解したと思います。TCP/IP
しかし、明確にするために、クライアントとサーバーの間に単一の接続があります。クライアント側では、多数のスレッドが同時に実行されています。これらのスレッドのランダムに 1 つ以上が、TCP/IP
接続を使用してサーバーと通信します。たとえば、実行時間の長いファイル転送がアクティブなときに、別のスレッドとの接続を同時に使用すると、エラーが発生する可能性があることがわかりました。各メッセージの前にデータ長を含む特定のヘッダーを付けましたが、IP
つまり、1 つのメッセージはまだ完全に配信されていませんが、別のメッセージの一部が read メソッドに配信されます。TCP/IP
これは、意図した動作と一致する正しい観察ですか? 前もって感謝します - マリオ
++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++
興味のある方へ: 以下は、私のテスト プログラムのソース コードです。BUFFER_SIZE のさまざまな値と、同じソケットを使用する同時 TCP/IP 送信でサーバー ソケットを攻撃するために使用される THREADS の数で遊ぶことができます。一部のエラー処理を省略し、ソケットのクローズを含むより洗練された終了を削除しました。64KB を超える BUFFER_SIZE でテストすると、マシンで常にエラーが発生します。
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
public class TCPTest
{
private final static String INPUT_FILE = "c:/temp/tcptest.in";
private final static int BUFFER_SIZE = 64 * 1024 - 8; //65536;
private final static int MESSAGE_SIZE = 512 * 64 * 1024;
private final static int THREADS = 3;
private final static int SIZE_OF_INT = 4;
private final static int LENGTH_SIZE = SIZE_OF_INT;
private final static int ID_SIZE = SIZE_OF_INT;
private final static int HEADER_SIZE = LENGTH_SIZE + ID_SIZE;
private final static String NEW_LINE = System.getProperty("line.separator");
private ServerSocket m_serverSocket = null;
private Socket m_clientSocket = null;
private int m_iThreadCounter;
public static void main(String[] args)
{
new TCPTest();
} // main
public TCPTest()
{
final String id = "ReaderThread[*]";
// start a new thread creating a server socket waiting for connections
new Thread(new Runnable()
{
public void run()
{
try
{
// create server socket and accept client requests
m_serverSocket = new ServerSocket(12345);
m_clientSocket = m_serverSocket.accept();
// client request => prepare and read data
long startTime = System.currentTimeMillis();
byte[] buffer = new byte[BUFFER_SIZE];
ByteBuffer header = ByteBuffer.allocate(HEADER_SIZE);
int iTotalBytesRead = 0;
boolean fTerminate = false;
int iBytesRead;
// get hold of socket's input stream
InputStream clientInputStream = m_clientSocket.getInputStream();
// loop
while (false == fTerminate)
{
// loop to read next header
for (int i = 0; i < HEADER_SIZE; i++)
clientInputStream.read(header.array(), i, 1);
header.rewind();
// get information of interest
int iLength = header.getInt();
int iId = header.getInt();
int iLengthSoFar = 0;
int iBytesLeft = iLength;
int iBytesToRead;
// any length given?
if ((0 < iLength) && (BUFFER_SIZE >= iLength))
{
// that's the case => read complete message
while (iLengthSoFar < iLength)
{
// calculate number of bytes left
iBytesLeft = iLength - iLengthSoFar;
// calculate maximum number of bytes to read
if (iBytesLeft > BUFFER_SIZE)
iBytesToRead = BUFFER_SIZE;
else
iBytesToRead = iBytesLeft;
// read next portion of bytes
if ((iBytesRead = clientInputStream.read(buffer, 0, iBytesToRead)) != -1)
{
// maintain statistics
iTotalBytesRead += iBytesRead;
iLengthSoFar += iBytesRead;
} // if
else
{
// finish => print message
System.out.println("==> "+id+": ERROR length=<-1> received " +
"for id=<"+iId+">");
fTerminate = true;
break;
} // else
} // while
} // if
else
{
System.out.println("==> "+id+": ERROR data length <= 0 for id=<"+iId+">");
dump(header, 0, HEADER_SIZE / SIZE_OF_INT, "Error header");
} // else
} // while
System.out.println("==> "+id+": "+ iTotalBytesRead + " bytes read in "
+ (System.currentTimeMillis() - startTime) + " ms.");
} // try
catch (IOException e)
{
e.printStackTrace();
} // catch
} // run
}).start();
// create the socket writer threads
try
{
// ensure server is brought up and request a connection
Thread.sleep(1000);
System.out.println("==> "+id+": just awoke");
Socket socket = new Socket("localhost", 12345);
OutputStream socketOutputStream = socket.getOutputStream();
System.out.println("==> "+id+": socket obtained");
// create some writer threads
for (int i = 0; i < THREADS; i++)
// create a new socket writer and start the thread
(new SocketWriter(socket,
(i+1),
BUFFER_SIZE,
new String("WriterThread["+(i+1)+"]"),
socketOutputStream)).start();
} // try
catch (Exception e)
{
e.printStackTrace();
} // catch
} // TCPTestEx
private final static void dump(ByteBuffer bb, int iOffset, int iInts, String header)
{
System.out.println(header);
bb.rewind();
for (int i = 0; i < iInts; i++)
System.out.print(" " + Integer.toHexString(bb.getInt()).toUpperCase());
System.out.print(NEW_LINE);
} // dump
private class SocketWriter extends Thread
{
Socket m_socket;
int m_iId;
int m_iBufferSize;
String m_id;
OutputStream m_os;
protected SocketWriter(Socket socket, int iId, int iBufferSize, String id, OutputStream os)
{
m_socket = socket;
m_iId = iId;
m_iBufferSize = iBufferSize;
m_id = id;
m_os = os;
// increment thread counter
synchronized (m_serverSocket)
{
m_iThreadCounter++;
} // synchronized
} // SocketWriter
public final void run()
{
try
{
long startTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocate(m_iBufferSize + HEADER_SIZE);
int iTotalBytesRead = 0;
int iNextMessageSize = 512 * m_iBufferSize;
int iBytesRead;
// open input stream for file to read and send
FileInputStream fileInputStream = new FileInputStream(INPUT_FILE);
System.out.println("==> "+m_id+": file input stream obtained");
// loop to read complete file
while (-1 != (iBytesRead = fileInputStream.read(buffer.array(), HEADER_SIZE, m_iBufferSize)))
{
// add length and id to buffer and write over TCP
buffer.putInt(0, iBytesRead);
buffer.putInt(LENGTH_SIZE, m_iId);
m_os.write(buffer.array(), 0, HEADER_SIZE + iBytesRead);
// maintain statistics and print message if so desired
iTotalBytesRead += iBytesRead;
if (iNextMessageSize <= iTotalBytesRead)
{
System.out.println("==> "+m_id+": <"+iTotalBytesRead+"> bytes processed");
iNextMessageSize += MESSAGE_SIZE;
} // if
} // while
// close my file input stream
fileInputStream.close();
System.out.println("==> "+m_id+": file input stream closed");
System.out.println("==> "+m_id+": <"+ iTotalBytesRead + "> bytes written in "
+ (System.currentTimeMillis() - startTime) + " ms.");
// decrement thread counter
synchronized (m_serverSocket)
{
m_iThreadCounter--;
// last thread?
if (0 >= m_iThreadCounter)
// that's the case => terminate
System.exit(0);
} // synchronized
} // try
catch (Exception e)
{
e.printStackTrace();
} // catch
} // run
} // SocketWriter
} // TCPTest