Javaには複数の継承がなく、両方のストリームがインターフェースではなく抽象クラスであるため、JavaでInputStreamとOutputStreamの両方であるPipeオブジェクトを作成するための良い提案はありますか?
根底にある必要性は、あるスレッドから別のスレッドへの入力に出力をパイプするために InputStream または OutputStream のいずれかを必要とするものに渡すことができる単一のオブジェクトを持つことです。
Javaには複数の継承がなく、両方のストリームがインターフェースではなく抽象クラスであるため、JavaでInputStreamとOutputStreamの両方であるPipeオブジェクトを作成するための良い提案はありますか?
根底にある必要性は、あるスレッドから別のスレッドへの入力に出力をパイプするために InputStream または OutputStream のいずれかを必要とするものに渡すことができる単一のオブジェクトを持つことです。
この質問のポイントが見落とされているようです。私の理解が正しければ、2 つのスレッド間の通信手段を作成するために、あるスレッドでは InputStream のように機能し、別のスレッドでは OutputStream のように機能するオブジェクトが必要です。
おそらく 1 つの答えは、継承の代わりに構成を使用することです (これはとにかく推奨される方法です)。getInputStream() および getOutputStream() メソッドを使用して、相互に接続された PipedInputStream および PipedOutputStream を含む Pipe を作成します。
ストリームを必要とするものに Pipe オブジェクトを直接渡すことはできませんが、その get メソッドの戻り値を渡すことはできます。
それはあなたのために働きますか?
java.io.PipedOutputStream と java.io.PipedInputStream は、このシナリオで使用するクラスのようです。これらは、スレッド間でデータをパイプするために一緒に使用するように設計されています。
単一のオブジェクトを本当に渡したい場合は、これらのそれぞれの 1 つを含み、getter を介してそれらを公開する必要があります。
これはかなり一般的なことだと思います。この質問を参照してください。
InputStream
との両方から派生するクラスを作成することはできませんOutputStream
。これらはインターフェイスではなく、共通のメソッドがあり、Javaでは多重継承が許可されていないためです(コンパイラは呼び出すかどうか、または新しいオブジェクトを呼び出すInputStream.close()
かどうかを認識しません) 。OutputStream.close()
close()
もう1つの問題はバッファです。Javaは、データに静的バッファーを割り当てたいと考えています(これは変更されません)。つまり、 `java.io.PipedXxxStream'を使用すると、2つの異なるスレッドを使用しない限り、データの書き込みは最終的にブロックされます。
したがって、Apocalispからの答えは正しいです。コピーループを作成する必要があります。
プロジェクトにApacheのcommons-ioを含めることをお勧めします。このプロジェクトには、このようなタスク(ストリーム、ファイル、文字列、およびそれらのすべての組み合わせの間でデータをコピーする)専用の多くのヘルパールーチンが含まれています。
サーブレットへの接続が遅いためのフィルターを実装する必要があったため、基本的にサーブレット出力ストリームを QueueOutputStream にラップし、(小さなバッファー内の) すべてのバイトをキューに追加してから、それらの小さなバッファーを 2 番目の出力ストリームに出力します。これはある意味で入出力ストリームとして機能します。私見ですが、これはそれほどスケールしない JDK パイプよりも優れています。基本的に、標準の JDK 実装では (読み取り/書き込みごとに) コンテキストの切り替えが多すぎます。ブロッキング キューは単に単一のプロデューサー/コンシューマー シナリオに最適:
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;
public class QueueOutputStream extends OutputStream
{
private static final int DEFAULT_BUFFER_SIZE=1024;
private static final byte[] END_SIGNAL=new byte[]{};
private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
private final byte[] buffer;
private boolean closed=false;
private int count=0;
public QueueOutputStream()
{
this(DEFAULT_BUFFER_SIZE);
}
public QueueOutputStream(final int bufferSize)
{
if(bufferSize<=0){
throw new IllegalArgumentException("Buffer size <= 0");
}
this.buffer=new byte[bufferSize];
}
private synchronized void flushBuffer()
{
if(count>0){
final byte[] copy=new byte[count];
System.arraycopy(buffer,0,copy,0,count);
queue.offer(copy);
count=0;
}
}
@Override
public synchronized void write(final int b) throws IOException
{
if(closed){
throw new IllegalStateException("Stream is closed");
}
if(count>=buffer.length){
flushBuffer();
}
buffer[count++]=(byte)b;
}
@Override
public synchronized void write(final byte[] b, final int off, final int len) throws IOException
{
super.write(b,off,len);
}
@Override
public synchronized void close() throws IOException
{
flushBuffer();
queue.offer(END_SIGNAL);
closed=true;
}
public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
{
return executor.submit(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try{
byte[] buffer=queue.take();
while(buffer!=END_SIGNAL){
outputStream.write(buffer);
buffer=queue.take();
}
outputStream.flush();
} catch(Exception e){
close();
throw e;
} finally{
outputStream.close();
}
return null;
}
}
);
}
Pipe または ArrayBlockingQueue を使用することをお勧めします。PipedInput/OutputStream を使用しないことをお勧めします.
https://bugs.openjdk.java.net/browse/JDK-8223048
BlockingQueue と Pipe の簡単な例を次に示します。
パイプ:
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
sinkChannel.write(buf);
}
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
参照: http://tutorials.jenkov.com/java-nio/pipe.html
ブロッキング キュー:
//Shared class used by threads
public class Buffer {
// ArrayBlockingQueue
private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
public void get() {
// retrieve from ArrayBlockingQueue
try {
System.out.println("Consumer received - " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void put(int data) {
try {
// putting in ArrayBlockingQueue
blockingQueue.put(data);
System.out.println("Producer produced - " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// Starting two threads
ExecutorService executorService = null;
try {
Buffer buffer = new Buffer();
executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Producer(buffer));
executorService.execute(new Consumer(buffer));
} catch (Exception e) {
e.printStackTrace();
}finally {
if(executorService != null) {
executorService.shutdown();
}
}
}
public class Consumer implements Runnable {
private Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
try {
buffer.get();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Producer implements Runnable {
private Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
Random random = new Random();
int data = random.nextInt(1000);
buffer.put(data);
}
}
}
参考: https ://github.com/kishanjavatrainer/ArrayBlockingQueueDemo/tree/master/ArrayBlockingQueueDemo