1

バイト配列の固定長配列を使用して、ブロッキング キューのカスタム実装を実行しようとしています。ポーリングされた要素を削除していないため、put メソッドを調整してバイト配列を返し、直接書き込みできるようにしました (プロデューサー スレッドは MappedByteBuffer を使用してこのバイト配列に直接書き込みます)。「commitPut()」メソッドを追加して、単純にカウンターを増やし、「長さ」配列を設定しました。(複数のスレッドが書き込みを行う場合、これは並行性の問題である可能性がありますが、書き込みを行っているのは 1 つのスレッドだけであることはわかっています)。

以下は私が現在持っているものです。段階的にデバッグすると機能しますが、「実行」すると、ロックの問題が発生したように見えます。ArrayBlockingQueue コードをコピーし、削除して調整しました。より良い知識を持つ誰かがクラスを見て、私が間違っていること、またはそれを改善する方法を教えてもらえますか (バッファに直接書き込み、同じステップで長さ配列とカウンターを設定するなど)?

public class ByteArrayBlockingQueue {

    private final int[] lens; // array to valid lengths
    private final byte[][] items; // array of byte arrays

    private int takeIndex = 0;
    private int putIndex = 0;
    private int count = 0;

    public volatile int polledLen = 0; // lenght of last polled byte array

    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }

    public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new byte[capacity][size];
        this.lens = new int[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull  = lock.newCondition();
    }

    public byte[] put() throws InterruptedException {
        final byte[][] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();

            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            //insert(e, len);
            return items[putIndex];
        } finally {
            lock.unlock();
        }
    }

    public void commitPut(int lenBuf) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            lens[putIndex] = lenBuf;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public byte[] poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            final byte[][] items = this.items;
            final int[] lens = this.lens;
            byte[] e = items[takeIndex];
            this.polledLen = lens[takeIndex];
            //items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return e;

        } finally {
            lock.unlock();
        }
    }
}
4

1 に答える 1

0

キューが循環する場合、コンシューマによって読み取られる前に、バイト配列が再利用されて上書きされる可能性があります。commitGetつまり、配列を新しいデータで上書きする前に、プロデューサーがコンシューマーを待機するようにするメソッドが必要です。

ただし、私の提案は、 java.util.concurrent.BlockingQueueが 2 番目のキューを持ってコンシューマーからプロデューサーにそれらを返すことに依存し、 java.nio.ByteByfferに依存して長さを追跡することです。プロデューサーは次のようにします。

ByteBuffer buffer = bufferQueue.poll(); // instead of your put()
buffer.put(source);                     // fill buffer from source MappedByteBuffer
buffer.flip();                          // set length to the amount written
dataQueue.offer(buffer);                // instead of commitPut()

消費者は次のことを行います。

ByteBuffer buffer = dataQueue.poll();   // instead of your get()
buffer.get(...);                        // use data
buffer.clear();                         // reset length               
bufferQueue.offer(buffer);              // this is the missing commitGet()

最初capacityに要素を に挿入する必要がありfreeQueueます。ただしsource、元のコードで既に行っていたように、これはバッファからキュー内の一時バッファにデータを 1 回コピーすることに注意してください。

本当にデータをコピーしたくない場合 (そして、すべてのコンシューマーが読み取るまでソースが変更されないことが確実な場合)、より良いオプションは、単一のブロッキング キューを使用し、ByteBuffer.slice()で取得したバッファーを挿入することです。コンシューマーに渡されるデータの各チャンクのソース バッファー。これらはガベージ コレクションされますが、バイト配列自体よりもはるかに少ないメモリしか必要としません。

于 2013-05-20T08:19:03.637 に答える