コースワーク プロジェクトの一環として、Selective Repeat を使用して Java の基本的な UDP プロトコルに信頼性のレイヤーを実装しようとしています: http://en.wikipedia.org/wiki/Selective_Repeat_ARQ。基本的に、各パケットは、送信されると、別のスレッドで独自のタイマーを追跡します。特定のタイマーが切れると、パケットが再送されます。
比較的大きなタイムアウト設定 (500ms など) の場合、このコードは正常に実行され、大きなファイルは完全に受信側に送信されます。ただし、タイムアウトを低く設定すると (例: 20ms)、端末に次のようなスパム メッセージが表示されます。
java.nio.channels.ClosedChannelException
at sun.nio.ch.DatagramChannelImpl.ensureOpen(DatagramChannelImpl.java:132)
at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:241)
at Sender4.sendPak(Sender4.java:118)
at Sender4.access$000(Sender4.java:8)
at Sender4$packetTimer.run(Sender4.java:135)
しかし、私が見る限り、チャンネルは閉鎖されていません。この例外のドキュメントには次のように記載されています。
チャネルが閉じられているか、少なくともその操作に対して閉じられているチャネルで I/O 操作を呼び出しまたは完了しようとしたときにスローされるチェック例外です。この例外がスローされても、チャネルが完全に閉じられているとは限りません。たとえば、書き込み側がシャットダウンされたソケット チャネルは、まだ読み取り用に開いている可能性があります。
なんらかの理由で利用できないため、おそらく閉鎖されていると思います。タイムアウト値が小さい場合にのみ発生するため、2 つのスレッドが同時に再送信しようとしている可能性がありますか? ただし、送信方法(sendPak)は同期されているため、これは不可能なはずです。
この問題の原因は何ですか? または、この問題を回避するために使用できる修正は何ですか? これが私のプログラムの送信側部分のコードです。受信側は問題ないと確信しています。
/* Craig Innes 0929508 */
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.nio.channels.*;
public class Sender4 {
short base = 0;
short nextSeqNum = 0;
byte[][] packets;
ByteBuffer bb;
String endSys;
int portNum;
String fileName;
int retryTime;
int windowSize;
DatagramSocket clientSocket;
InetAddress IPAddress;
InetSocketAddress destination;
boolean timedOut = false;
int resends = 0;
HashMap<Short, packetTimer> timers = new HashMap<Short, packetTimer>();
DatagramChannel clientChannel;
public Sender4(String endSys, int portNum, String fileName, int retryTime, int windowSize){
this.endSys = endSys;
this.portNum = portNum;
this.fileName = fileName;
this.retryTime = retryTime;
this.windowSize = windowSize;
}
public static void main(String args[]) throws Exception{
//Check for current arguments and assign them
if(args.length != 5){
System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>");
System.exit(1);
}
Sender4 sendy = new Sender4(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));
sendy.go();
}
private void go() throws Exception{
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(false);
bb = ByteBuffer.allocate(2);
byte[] picData = new byte[1021];
byte[] sendData = new byte[1024];
byte[] seqBytes = new byte[2];
byte EOFFlag = 0;
boolean acknowledged = false;
int resends = 0;
IPAddress = InetAddress.getByName(endSys);
destination = new InetSocketAddress(IPAddress, portNum);
FileInputStream imReader = new FileInputStream(new File(fileName));
double fileSizeKb = imReader.available() / 1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent.
int packetsNeeded = (int) Math.ceil(fileSizeKb);
packets = new byte[packetsNeeded][];
long startTime = System.currentTimeMillis();
long endTime;
double throughput;
//Create array of packets to send
for(int i = 0; i < packets.length; i++){
if(i == packets.length - 1){
EOFFlag = 1;
picData = new byte[imReader.available()];
sendData = new byte[picData.length + 3];
}
imReader.read(picData);
bb.putShort((short)i);
bb.flip();
seqBytes = bb.array();
bb.clear();
System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length);
sendData[2] = EOFFlag;
System.arraycopy(picData, 0, sendData, 3, picData.length);
packets[i] = (byte[])sendData.clone();
}
//System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum);
while(base != packets.length || !timers.isEmpty()){
while(nextSeqNum - base < windowSize && nextSeqNum < packets.length){
System.out.println("sending packet with seqNum: " + nextSeqNum);
sendPak(nextSeqNum);
timers.put(nextSeqNum, new packetTimer(nextSeqNum));
timers.get(nextSeqNum).start();
System.out.println("nextSeq: " + nextSeqNum + "base " + base + "windowSize " + windowSize + "timer size" + timers.size());
nextSeqNum++;
}
//Done all the sending we can, have a check for any ACKs we have received...
getACK();
}
endTime = System.currentTimeMillis();
throughput = 1000 * fileSizeKb / (endTime - startTime);
clientChannel.close();
imReader.close();
System.out.println("Number of retransmissions: " + resends);
System.out.println("Average throughput is: " + throughput + "Kb/s");
}
private synchronized void sendPak(short resNum) throws IOException{
//System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window");
ByteBuffer sendBuff = ByteBuffer.wrap(packets[resNum]);
clientChannel.send(sendBuff, destination);
sendBuff.clear();
}
private class packetTimer extends Thread{
short sendingNum;
boolean timeToStop = false;
boolean fileACKed = false;
public packetTimer(short seqNum){
sendingNum = seqNum;
}
public void run() {
//If packet times out - resend. If thread interrupted, we have received the corresponding ack
while(waitForACK()){
System.out.println("Packet timed out. Resending packet: " + sendingNum);
try{
sendPak(sendingNum);
}catch(IOException ex){
System.out.println("I think this is causing the problems");
ex.printStackTrace();
}
}
System.out.println("Thread" + sendingNum + "has reached completion");
}
private boolean waitForACK(){
if(this.interrupted()){
return false;
}
try{
Thread.sleep(retryTime);
}catch(InterruptedException ex){
return false;
}
return true;
}
}
private synchronized void getACK() throws Exception{
//Listen out for ACKs and update pointers accordingly
ByteBuffer ackBuff;
byte[] ackData = new byte[2];
ackBuff = ByteBuffer.wrap(ackData);
SocketAddress recked = clientChannel.receive(ackBuff);
if(recked != null){ //Only if it actually receives anything, check for nullity
//System.out.println("ACK buff size: " + ackBuff.capacity() + "Current position: " + ackBuff.position() + "remaining: " + ackBuff.remaining());
ackBuff.flip();
short ack = ackBuff.getShort();
System.out.println("ack received: " + ack);
ackBuff.clear();
if(timers.containsKey(ack)){ //Stop Timer
System.out.println("Interrupting timer: " + ack);
timers.get(ack).interrupt();
timers.get(ack).fileACKed = true;
}
if(base == ack){ //If you receive ack for the base, remove all the consecutively stopped timers
while(timers.containsKey(base) && timers.get(base).fileACKed){
System.out.println("Removing: " + base);
timers.remove(base);
base++;
}
}
//System.out.println("acknowledgement for base num: " + base + "ack num:" + ack);
}
System.out.println("Waiting for base: " + base + "packets length is " + packets.length + "timers size is: " + timers.size() + "but is it empty? " + timers.isEmpty());
Thread.yield();
}
}