31

非常に単純な生産者/消費者シナリオでjava.util.concurrent.BlockingQueueを使用しています。たとえば、この擬似コードは消費者の部分を表しています。

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

ここまでは順調ですね。ブロッキングキューのjavadocで、次のように読みました。

BlockingQueueは、アイテムが追加されないことを示すための「閉じる」または「シャットダウン」操作を本質的にサポートしていません。このような機能のニーズと使用法は、実装に依存する傾向があります。たとえば、一般的な戦術は、プロデューサーが特別なエンドオブストリームまたはポイズンオブジェクトを挿入することです。これらは、コンシューマーによって取得されたときにそれに応じて解釈されます。

残念ながら、使用されているジェネリックとComplexObjectの性質により、「ポイズンオブジェクト」をキューにプッシュするのは簡単ではありません。したがって、この「一般的な戦術」は、私のシナリオではあまり便利ではありません。

私の質問は、キューを「閉じる」ために他にどのような優れた戦術/パターンを使用できるかということです。

ありがとうございました!

4

10 に答える 10

21

コンシューマースレッドへのハンドルがある場合は、それを中断できます。あなたが与えたコードで、それは消費者を殺します。私はプロデューサーがこれを持っているとは思いません。おそらく、プログラムコントローラにコールバックして、完了したことを通知する必要があります。次に、コントローラーはコンシューマースレッドを中断します。

割り込みに従う前に、いつでも作業を終了できます。例えば:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

とにかくキューをどうにかして毒殺するよりも、私は実際にそれを好むでしょう。スレッドを強制終了する場合は、スレッドに自分自身を強制終了するように依頼してください。

InterruptedException(誰かが適切に処理しているのを見るのはいいことです。)


ここでの中断の処理については、いくつかの論争があるようです。まず、皆さんにこの記事を読んでいただきたいと思います。

さて、誰も実際にそれを読んでいないことを理解した上で、これが取引です。InterruptedExceptionスレッドは、割り込み時に現在ブロックされていた場合にのみ受信します。この場合、Thread.interrupted()はを返しfalseます。ブロックしていなかった場合は、この例外を受け取らず、代わりにThread.interrupted()を返しtrueます。したがって、ループガードは、何があっても絶対にチェックする必要Thread.interrupted()があります。そうしないと、スレッドの中断を見逃すリスクがあります。

つまり、何があってもチェックしているので、Thread.interrupted()キャッチする必要がありますInterruptedException(そして、強制されていなくても処理する必要があります)。これで、同じイベントであるスレッドの中断を処理する2つのコード領域ができました。これを処理する1つの方法は、それらを1つの条件に正規化することです。つまり、ブール状態チェックで例外をスローするか、例外でブール状態を設定できます。後で選びます。


編集:静的Thread#interruptedメソッドは、現在のスレッドの中断されたステータスをクリアすることに注意してください。

于 2011-03-21T13:44:26.260 に答える
12

これを簡単にするための別のアイデア:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

次にBlockingQueue<QueueableComplexObject>、キューとして使用します。キューの処理を終了したい場合は、を実行してくださいqueue.offer(NullComplexObject.INSTANCE)。消費者側では、

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

必須ではありません。また、実装によっては高価/困難なinstanceof偽物を作成する必要もありません。ComplexObject

于 2011-03-21T15:07:50.133 に答える
9

別の方法は、実行している処理をでラップし、ExecutorServiceジョブExecutorServiceがキューに追加されるかどうかを自分自身で制御できるようにすることです。

基本的に、を利用しますExecutorService.shutdown()。これを呼び出すと、エグゼキュータによるタスクの処理が禁止されます。

例ので、現在どのようにタスクを送信しているかわかりませんQueueConsumer。私はあなたが何らかのsubmit()方法を持っていると仮定し、例で同様の方法を使用しました。

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

java.util.concurrentこの例は、パッケージが提供するものの単なる説明です。おそらく、いくつかの最適化を行うことができます(たとえば、QueueConsumer独自のクラスはおそらく必要ないためExecutorService、タスクを送信しているプロデューサーに提供するだけで済みます)。

java.util.concurrentパッケージを掘り下げます(上記のリンクのいくつかから始めます)。あなたはそれがあなたがやろうとしていることのためにあなたにたくさんの素晴らしいオプションを与えることに気付くかもしれません、そしてあなたはワークキューを調整することについて心配する必要さえありません。

于 2011-03-21T14:41:33.120 に答える
7

毒オブジェクトを作成する別の可能性:それをクラスの特定のインスタンスにします。このように、サブタイプをいじったり、ジェネリックを台無しにする必要はありません。

欠点:プロデューサーとコンシューマーの間に何らかのシリアル化の障壁がある場合、これは機能しません。

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}
于 2011-03-21T14:30:40.130 に答える
4

汎用オブジェクトをデータオブジェクトにラップできます。このデータオブジェクトには、毒オブジェクトのステータスなどのデータを追加できます。dataobjectは、2つのフィールドを持つクラスです。T complexObject;およびboolean poison;

コンシューマーは、キューからデータオブジェクトを取得します。ポイズンオブジェクトが返された場合は、コンシューマーを閉じます。それ以外の場合は、ジェネリックをアンラップして「process(complexObject)」を呼び出します。

java.util.concurrent.LinkedBlockingDeque<E>キューの最後にオブジェクトを追加して、先頭からオブジェクトを取得できるように、を使用しています。そうすれば、オブジェクトは順番に処理されますが、より重要なのは、毒オブジェクトに遭遇した後にキューを閉じた方が安全です。

複数のコンシューマーをサポートするために、poisonオブジェクトに遭遇したときにキューに追加し直します。

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}
于 2011-05-25T09:55:00.730 に答える
3

ComplexObjectを拡張して、重要な作成機能をモックアウトすることは可能ですか?基本的にはシェルオブジェクトになりますがinstance of、キューオブジェクトの終わりかどうかを確認するために行うことができます。

于 2011-03-21T13:47:40.150 に答える
1

クローズ可能なものを実装することは私には合理的だと思われますBlockingQueue

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

次の動作でこれを実装するのは非常に簡単です(メソッドの要約表を参照)。

  • 挿入

    • 例外をスローします、特別な値

      テストする完全なQueue発信者の責任のように動作しますisClosed()

    • ブロック

      閉じIllegalStateExceptionている場合はスローします。

    • タイムアウト

      false閉じている場合は、呼び出し元がテストする責任を返しますisClosed()

  • 削除

    • 例外をスローします、特別な値

      Queueの、呼び出し元のテストの責任のように動作しますisClosed()

    • ブロック

      閉じNoSuchElementExceptionている場合はスローします。

    • タイムアウト

      null閉じている場合は、呼び出し元がテストする責任を返しますisClosed()

  • 診る

    変化なし。

ソースを編集してこれを行いました。github.comで見つけてください。

于 2012-07-22T06:51:11.060 に答える
1

今日、私はラッパーオブジェクトを使用してこの問題を解決しました。ComplexObjectは複雑すぎてサブクラス化できないため、ComplexObjectをComplexObjectWrapperオブジェクトにラップしました。次に、ジェネリック型としてComplexObjectWrapperを使用しました。

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

BlockingQueue<ComplexObject>今私がした 代わりにBlockingQueue<ComplexObjectWrapper>

私はコンシューマーとプロデューサーの両方を制御していたので、このソリューションは私のために機能しました。

于 2013-09-23T20:39:18.140 に答える
0

この状況では、通常、ジェネリックスを捨てて、キューホールドタイプをオブジェクトにする必要があります。次に、実際のタイプにキャストする前に、「毒」オブジェクトをチェックする必要があります。

于 2011-03-21T13:54:22.773 に答える
0

私はこのシステムを使用しました:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

プロデューサーが終了したら、consumerObject、queueIsNotEmptyフィールドをfalseに設定しました

于 2013-05-28T10:40:07.990 に答える