3

関連する質問を見つけましたが、完全な例が提供されていないため、特に役に立ちませんでした。

問題: AsynchronousSocketChannelを使用して、固定サイズのバッファーを使用して長さが不明なデータを読み取る方法

最初の試行 (1 回読み取り):

final int bufferSize = 1024;
final SocketAddress address = /*ip:port*/;
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
final ExecutorService executor = Executors.newCachedThreadPool(threadFactory);
final AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(executor, 5);
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(asyncChannelGroup);
client.connect(address).get(5, TimeUnit.SECONDS);//block until the connection is established

//write the request
Integer bytesWritten = client.write(StandardCharsets.US_ASCII.encode("a custom request in a binary format")).get();

//read the response
final ByteBuffer readTo = ByteBuffer.allocate(bufferSize);
final StringBuilder responseBuilder = new StringBuilder();
client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() {
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            buffer.flip();
            responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString());
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
);
asyncChannelGroup.awaitTermination(5, TimeUnit.SECONDS);
asyncChannelGroup.shutdown();
System.out.println(responseBuilder.toString());

while (つまり、ストリームの終わりに達した)バッファへの継続的な読み取りをきれいに実装するには、どのような変更を加える必要がありますか?bytesRead != -1

4

4 に答える 4

1

これが私がやったことの簡略化されたバージョンです( GuavaListenableFutureを使用):

class SomeUtilClass {
public interface Processor<T> {
    boolean process(Integer byteCount, ByteBuffer buffer);
    T result();
}
public static <T> ListenableFuture<T> read(
    final AsynchronousSocketChannel delegate,
    final Processor<T> processor,
    ByteBuffer buffer
) {
    final SettableFuture<T> resultFuture = SettableFuture.create();
    delegate.read(buffer, buffer, new Handler<T, Integer, ByteBuffer>(resultFuture) {
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            buffer.flip();
            if(processor.process(bytesRead, buffer)) {
                buffer.clear();
                delegate.read(buffer, buffer, this);
            } else {
                resultFuture.set(processor.result());
            }
        }
    });
    return resultFuture;
}
}

Commons PoolByteBufferの使用を含むさらなる改善

于 2012-11-14T20:13:58.433 に答える
0

私の最初の試み:

package com.example;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public class LoopingReader implements Callable<String> {
    final AsynchronousSocketChannel client;
    final String responseTerminator;
    final StringBuilder responseBuilder;

    LoopingReader(
        AsynchronousSocketChannel client,
        String responseTerminator
    ) {
        this.client = client;
        this.responseTerminator = responseTerminator;

        responseBuilder = new StringBuilder();
    }

    public String call() {
        boolean doLoop;
        do {
            int bytesRead = executeIteration();//blocking
            boolean didReachEndOfStream = bytesRead == -1;
            boolean didEncounterResponseTerminator = responseBuilder.indexOf(responseTerminator) != -1;

            doLoop =  !didReachEndOfStream && !didEncounterResponseTerminator;
        } while(doLoop);
        return responseBuilder.toString();
    }

    int executeIteration() {
        final ByteBuffer buffer = ByteBuffer.allocate(256);//use pool here
        final int bytesRead;
        try {
            bytesRead = client.read(buffer).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException("Failed to read", e);
        }
        decodeAndAppend(buffer);
        return bytesRead;
    }

    void decodeAndAppend(ByteBuffer buffer) {
        buffer.flip();
        responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer));
    }
}
于 2012-11-14T00:53:04.473 に答える
0

コールバック メソッドから延長されたものは何もしません。failedまたcompleted、それらはあなたの制御下にないスレッドで実行されます。

ストリームが最後 ( ) に達した場合でも、ソケット内の新しいバイトをリッスンし続けたいとのご要望を承りましたbytesRead == -1readメソッドをwhile(true)ループに入れます。その中で、およびメソッドsynchronizedによって設定されたフィールドをリッスンします。と呼びましょう。failedcompletedmyBytesRead

エンドレス読み取りを停止できるようにするには、while(true)を別のsynchronized条件に置き換えます。

private static final BYTES_READ_INIT_VALUE = Integer.MIN_VALUE;
private static final BYTES_READ_COMPLETED_VALUE = -1;
private static final BYTES_READ_FAILED_VALUE = -2;
private Integer myBytesRead = BYTES_READ_INIT_VALUE;

private void setMyBytesRead(final Integer bytesRead) {
    synchronized(myBytesRead) {
        this.myBytesRead = bytesRead;
    }
}

private Integer getMyBytesRead() {
    synchronized(myBytesRead) {
        return myBytesRead;
    }
}

...

// in your method
while (true) {
    final int lastBytesRead = getMyBytesRead();
    if (lastBytesRead == BYTES_READ_FAILED_VALUE) {
        // log failure and retry?
    } else if (lastBytesRead != BYTES_READ_COMPLETED_VALUE) {
        // Thread.sleep( a while ); to avoid exhausting CPU
        continue;
    }

    // else lastBytesRead == BYTES_READ_COMPLETED_VALUE and you can start a new read operation
    client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() {
            public void completed(Integer bytesRead, ByteBuffer buffer) {
                setMyBytesRead(bytesRead);
                buffer.flip();
                responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString());
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    setMyBytesRead(BYTES_READ_FAILED_VALUE);
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    );
}
于 2012-11-10T23:12:37.063 に答える
0

私の考えでは、最も簡単な方法は、このコードを独自のメソッドに分割し、bytesRead != -1. そうすれば、コードの責任を分離し、非同期読み取りの実行中に「ビジー待機」または Thread.sleep() を使用する必要を回避できます。bytesRead == -1もちろん、読み込まれたデータを使用して何かを行うケースを追加することもできます。

于 2012-11-10T22:35:34.923 に答える