6

私は、(ソケットに属する) OutputStream にかなり大きなデータのチャンクを書き込むアプリケーションを作成しています。これを少し複雑にしているのは、通常複数のスレッドが同じ OutputStream に書き込もうとしているということです。現在、データが書き込まれている OutputStream が独自のスレッドになるように設計しています。スレッドには、バイト配列をポーリングしてできるだけ早く書き込むキュー (LinkedList) が含まれています。

private class OutputStreamWriter implements Runnable {

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>();

    public void run() {
        OutputStream outputStream = User.this.outputStream;
        while (true) {
            try {
                if (chunkQueue.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                outputStream.write(chunkQueue.poll());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

この設計の問題点は、より多くの書き込みが発生するにつれて、ますます多くのデータがキューに入れられ、それ以上高速に書き込まれないことです。最初に、データがキューに入れられると、実際にはすぐに書き込まれます。その後、約 15 秒後にデータが遅れ始めます。データがキューに入れられてから実際に書き込まれるまでに遅延が発生します。時間が経つにつれて、この遅延はますます長くなります。とても目立ちます。

これを修正する方法は、ブロックせずにデータを送信できるようにするある種の ConcurrentOutputStream 実装であるため、書き込みがバックアップされ始めません (その場合、キューは不要になります)。そのような実装が存在するかどうかはわかりません -- 私は見つけることができませんでした -- 個人的には、それを書くことさえ不可能だと思います。

それで、これを再設計する方法について何か提案はありますか?

4

3 に答える 3

4

ソケットのスループットは制限されています。データ生成のスループットよりも遅い場合は、データをバッファリングする必要があり、それを回避する方法はありません。「同時に」と書いてもまったく役に立ちません。

メモリ消費を減らすために、キューに入れられたデータが特定の制限を超えたときにデータ生成を一時停止することを検討できます。

于 2012-11-13T05:27:15.430 に答える
0

@irreputableに同意します。並行書き込みは、少しでも役に立たないということです。代わりに、生産側、つまりすでに持っているものを見る必要があります。

  1. LinkedListの代わりにBlockingQueueを使用します。

  2. 定義上、平均して時間の50%を浪費する、100mslのブラインドスリープではなく、キューのブロッキングポーリング操作を使用します。長期間にわたって、それは実際に合計される可能性があります。

于 2012-11-13T07:46:40.520 に答える
0

DB接続をできるだけ早く閉じる必要がある遅い接続をインターセプトするフィルターが必要だったので、最初はJavaパイプを使用しましたが、それらの実装をよく見るとすべて同期されているため、小さなバッファーとブロッキングキューを使用して独自のQueueInputStreamを作成することになりました一度バッファをキューに入れると、一杯になりました。LinkedBlockingQueue で使用されるロック条件の場合を除いて、ロックはありません。小さなバッファの助けを借りて安価にする必要があります。このクラスは、単一のプロデューサとコンシューマにのみ使用することを目的としています。インスタンスごとに、ExecutorService を渡して、キューに入れられたバイトを最終的な OutputStream にストリーミングし始める必要があります。

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-18T17:50:10.457 に答える