8

私は、他のストリームをラップし、バックグラウンド スレッドで先読みするバックグラウンドInputStream(およびOutputStream) 実装を作成しました。これにより、主に、解凍されたストリームの処理とは異なるスレッドで解凍/圧縮が行われます。

これはかなり標準的な生産者/消費者モデルです。

これは、データの読み取り、処理、および書き込みを行う単純なプロセスでマルチコア CPU を有効に活用する簡単な方法のように思われ、CPU とディスク リソースの両方をより効率的に使用できます。ZipInputStreamおそらく「効率的」という言葉は最適な言葉ではありませんが、 a から直接読み取り、 aに直接書き込む場合と比較して、使用率が高くなり、私にとってより興味深いのは実行時間を短縮することZipOutputStreamです。

コードを投稿できてうれしいですが、私の質問は、既存の (そしてより頻繁に実行されている) ライブラリですぐに利用できるものを再発明するかどうかです。

編集 - コードを投稿...

のコードをBackgroundInputStream以下に示します (BackgroundOutputStream非常によく似ています) が、改善したい点があります。

  1. バッファを前後に渡すのに一生懸命働いているようです。
  2. 呼び出し元のコードが への参照を破棄するとBackgroundInputStream、 はbackgroundReaderThread永久に残ります。
  3. シグナリングeofの改善が必要です。
  4. 例外はフォアグラウンド スレッドに伝播する必要があります。
  5. 提供された からのスレッドの使用を許可したいと思いますExecutor
  6. メソッドはバックグラウンド スレッドに通知するclose()必要があり、ラップされたストリームはそれから読み取るバックグラウンド スレッドによって所有されている必要があるため、ラップされたストリームを閉じるべきではありません。
  7. 閉店後に読書などのばかげたことをすることは、適切に対応する必要があります。

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
4

2 に答える 2

3

興味深いですね。箱から出してすぐにこれを行うものに出くわしたことはありませんが、利用可能な場合は、アイドルコアを圧縮に使用することは完全に理にかなっています。

おそらく、Commons I/Oを利用できます。これは、よくテストされたライブラリであり、より退屈なものを処理するのに役立ち、クールな並列部分の拡張に集中することができます。あなたのコードを Commons プロジェクトに貢献することもできるかもしれません ;-)

于 2010-01-28T16:50:10.013 に答える
0

興味があります。同様のプロジェクトを考えてみましたが、圧縮が正常に終了しない部分を処理する方法がわかりませんでした。

于 2010-02-02T04:13:16.933 に答える