62

私は最近、このイディオムを発見しました。何か足りないものはないかと考えています。使っているのを見たことがない。私が実際に扱ったほぼすべての Java コードは、次の例のようなものではなく、データを文字列またはバッファーに丸呑みすることを好みます (たとえば、HttpClient および XML API を使用)。

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

このコードは、Unix パイピング スタイルの手法を使用して、XML データの複数のコピーがメモリに保持されるのを防ぎます。HTTP Post 出力ストリームと DOM Load/Save API を使用して、XML ドキュメントを HTTP リクエストのコンテンツとしてシリアライズします。私が知る限り、余分なコードをほとんど使わずにメモリの使用を最小限に抑えます ( RunnablePipedInputStream、およびの数行のみPipedOutputStream)。

では、この慣用句の何が問題になっているのでしょうか。このイディオムに問題がなければ、なぜ私はそれを見なかったのですか?

編集:どこにでも表示されるボイラープレートのバッファーごとのコピーを明確にPipedInputStreamし、置き換えます。また、処理されたデータを書き出すと同時に受信データを処理することもできます。PipedOutputStreamOS パイプを使用しません。

4

9 に答える 9

56

Javadocから:

通常、データは 1 つのスレッドによって PipedInputStream オブジェクトから読み取られ、データは別のスレッドによって対応する PipedOutputStream に書き込まれます。スレッドがデッドロックする可能性があるため、1 つのスレッドから両方のオブジェクトを使用しようとすることはお勧めしません。

これは、より一般的に使用されない理由を部分的に説明するかもしれません.

別の理由は、多くの開発者がその目的/利点を理解していないことだと思います.

于 2009-01-27T17:06:17.367 に答える
7

私も最近 PipedInputStream/PipedOutputStream クラスを発見しただけです。

SSH 経由でリモート サーバー上でコマンドを実行する必要がある Eclipse プラグインを開発しています。私はJSchを使用しており、Channel API は入力ストリームから読み取り、出力ストリームに書き込みます。しかし、入力ストリームを介してコマンドをフィードし、出力ストリームから応答を読み取る必要があります。それが PipedInput/OutputStream の出番です。

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();
于 2009-01-27T17:48:44.923 に答える
7

あなたの例では、2 つのスレッドを作成して、1 つのスレッドで実行できる作業を実行しています。そして、ミックスに I/O 遅延を導入します。

より良い例はありますか?それとも、あなたの質問に答えただけですか。


コメントの一部 (少なくとも私の見解) をメインの応答に取り込むには:

  • 並行性により、アプリケーションが複雑になります。単一の直線的なデータ フローを処理する代わりに、独立したデータ フローの順序付けを考慮する必要があります。場合によっては、特に複数のコア/CPU を活用して CPU を集中的に使用する作業を実行できる場合は、複雑さを増すことが正当化されることがあります。
  • 同時操作の恩恵を受けることができる状況にある場合は、通常、スレッド間のデータの流れを調整するためのより良い方法があります。たとえば、パイプされたストリームをオブジェクト ストリームにラップするのではなく、同時キューを使用してスレッド間でオブジェクトを渡します。
  • パイプ ストリームが良い解決策になるのは、Unix パイプライン (例: grep | sort) のようなテキスト処理を実行する複数のスレッドがある場合です。

特定の例では、パイプされたストリームにより、HttpClient によって提供される既存の RequestEntity 実装クラスを使用できます。より良い解決策は、以下のように新しい実装クラスを作成することだと思います。なぜなら、この例は最終的には逐次操作であり、同時実装の複雑さとオーバーヘッドの恩恵を受けられないからです。RequestEntity を匿名クラスとして示していますが、再利用可能であることは、それがファーストクラスのクラスであることを示しています。

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});
于 2009-01-27T16:48:13.503 に答える
4

また、元の例に戻ります。いいえ、メモリ使用量を正確に最小化するわけでもありません。DOMツリーが構築され、メモリ内のバッファリングが実行されます。これは、フルバイト配列のレプリカよりも優れていますが、それほど優れているわけではありません。ただし、この場合のバッファリングは遅くなります。また、追加のスレッドも作成されます。単一のスレッド内からPipedInput/OutputStreamのペアを使用することはできません。

PipedXxxStreamsが役立つ場合もありますが、それ以上使用されない理由は、多くの場合、適切なソリューションではないためです。それらはスレッド間通信に問題はなく、それが価値のあるもののために私がそれらを使用した場所です。SOAがそのような境界のほとんどをスレッド間ではなくサービス間でプッシュする方法を考えると、これのユースケースはそれほど多くないというだけです。

于 2009-01-27T20:50:59.317 に答える
3

パイプが理にかなっている使用例を次に示します。

doSomething(inputStream, outputStream) のようなインターフェースを持つ xslt マッパーや暗号 lib などのサードパーティの lib があるとします。また、ネットワーク経由で送信する前に結果をバッファリングしたくありません。Apache およびその他のクライアントは、ワイヤ出力ストリームへの直接アクセスを許可しません。あなたが得ることができる最も近いものは、リクエストエンティティオブジェクトで、ヘッダーが書き込まれた後のオフセットで、出力ストリームを取得することです。しかし、これは内部にあるため、入力ストリームと出力ストリームをサードパーティのライブラリに渡すだけではまだ十分ではありません。パイプは、この問題の良い解決策です。

ちなみに、 Apache Commons HTTP Client 4.3.4 を使用して、HTTP POST 用の OutputStream インターフェイスを提供するApache の HTTP クライアント API [PipedApacheClientOutputStream]の反転を作成しました。これは、Piped Streams が理にかなっている例です。

于 2016-02-11T01:21:33.197 に答える
2

しばらく前にこれらのクラスを使用してみましたが、詳細を忘れてしまいました。しかし、それらの実装には致命的な欠陥があることがわかりました。私はそれが何であったか思い出せませんが、時折デッドロックしたことを意味する競合状態だった可能性があるというこっそりした記憶があります (もちろん、私はそれらを別々のスレッドで使用していました:シングルスレッドであり、そのように設計されていません)。

私は彼らのソースコードを見て、何が問題だったのかを確認できるかもしれません.

于 2009-01-27T20:27:45.947 に答える
1

java.io パイプのコンテキスト切り替え (バイトごとの読み取り/書き込み) が多すぎます。対応する java.nio では、NIO のバックグラウンドとチャネルなどの適切な使用法が必要です。これは、ブロッキング キューを使用したパイプの私自身の実装です。単一のプロデューサー/コンシューマーは、高速に実行され、適切にスケーリングされます。

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-19T12:28:00.357 に答える
0

では、この慣用句の何が問題になっているのでしょうか。このイディオムに問題がなければ、なぜ私はそれを見なかったのですか?

編集: 明確にするために、PipedInputStream と PipedOutputStream は、どこにでも表示されるボイラープレート バッファーごとのコピーを置き換えます。また、処理されたデータを書き出すと同時に受信データを処理することもできます。OS パイプを使用しません。

あなたはそれが何をするかを述べましたが、なぜこれをしているのかは述べていません。

これにより、使用されるリソース (CPU/メモリ) が削減されるか、パフォーマンスが向上すると思われる場合は、どちらにもなりません。ただし、コードがより複雑になります。

基本的に、解決する問題のない解決策があります。

于 2009-01-27T20:02:27.183 に答える
0

PipedInputStream と PipeOutputStream は、反対側が満杯または空のバッファーから読み書きするのを待ってブロックしているときはいつでも、そのスレッドを1 秒間スリープさせます。使用禁止。

于 2020-04-10T12:32:28.303 に答える