37

オブジェクトのブロックキューがあります。

キューにオブジェクトができるまでブロックするスレッドを作成したいと思います。BlockingQueue.take()によって提供される機能に似ています。

ただし、オブジェクトを正常に処理できるかどうかわからないため、オブジェクトを削除せずに、peek()だけを実行したいと思います。オブジェクトを正常に処理できる場合にのみ、オブジェクトを削除したいと思います。

そこで、ブロッキングpeek()関数が必要です。現在、peek()は、javadocsに従って、キューが空の場合にのみ戻ります。

私は何かが足りないのですか?この機能を実現する別の方法はありますか?

編集:

スレッドセーフキューを使用して、代わりにピークしてスリープしたかどうかについて何か考えはありますか?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

コンシューマースレッドと(個別の)プロデューサースレッドが1つしかないことに注意してください。これはBlockingQueueを使用するほど効率的ではないと思います...コメントをいただければ幸いです。

4

8 に答える 8

16

LinkedBlockingDequeを使用して、(を使用して)キューからアイテムを物理的に削除できますが、を使用して処理が失敗した場合は、キューの最後でtakeLast()再度置換します。一方、「プロデューサー」は、を使用してキューの先頭に要素を追加します。putLast(E e)putFirst(E e)

いつでもこの動作を独自のQueue実装内にカプセル化し、基礎となるの舞台裏で実行するblockingPeek()メソッドを提供できます。したがって、呼び出し元のクライアントの観点からは、要素がキューから削除されることはありません。takeLast()putLast()LinkedBlockingDeque

于 2009-11-18T23:26:36.450 に答える
6

ただし、オブジェクトを正常に処理できるかどうかわからないため、オブジェクトを削除せずに、peek()だけを実行したいと思います。オブジェクトを正常に処理できる場合にのみ、オブジェクトを削除したいと思います。

一般に、スレッドセーフではありません。peek()オブジェクトを正常に処理できると判断した後、take()削除して処理する前に、別のスレッドがそのオブジェクトを取得した場合はどうなりますか?

于 2009-11-18T23:39:48.623 に答える
2

また、イベントリスナーキューをブロッキングキューに追加し、(ブロッキング)キューに何かが追加されたら、リスナーにイベントを送信することもできますか?actionPerformedメソッドが呼び出されるまで、スレッドブロックを設定できます。

于 2009-11-18T23:17:36.910 に答える
2

私が知っているのは、これがApacheCommonsCollectionsのBlockingBufferだけです。

空のバッファでgetまたはremoveが呼び出された場合、呼び出し元のスレッドは、addまたはaddAll操作が完了したという通知を待ちます。

get()はと同等peek()であり、 UnboundedFifoBufferをで装飾することにより、のBufferように動作させることができます。BlockingQueueBlockingBuffer

于 2009-11-18T23:18:56.283 に答える
1

簡単な答えは、ブロッキングピークを自分で実装する方法が実際にはないということではなく、ブロッキングpeek()を使用してブロッキングキューを実装することです。

私は何かが足りないのですか?

peek()は同時実行性で面倒な場合があります-

  • peek()されたメッセージを処理できない場合、複数のコンシューマーがない限り、メッセージはキューに残されます。
  • あなたがそれを処理できない場合、誰がそのオブジェクトをキューから外すつもりですか?
  • 複数のコンシューマーがある場合、peek()を実行しているときに、アイテムを処理している別のスレッドとの間で競合状態が発生し、処理が重複したり、さらに悪化したりします。

実際にアイテムを削除し、 Chain-of-responsibilityパターンを使用して処理する方がよいようです。

編集:re:最後の例:コンシューマーが1つしかない場合、その間に更新されない限り、キュー上のオブジェクトを削除することはありません。その場合、スレッドの安全性ととにかくアイテムをキューに入れるべきではなかったでしょう。

于 2009-11-18T23:36:40.387 に答える
1

それ自体は答えではありませんが、JDK-6653412は、これは有効なユースケースではないと主張しています。

于 2018-09-14T16:38:49.417 に答える
0

BlockingQueue自体に指定した機能がないようです。

ただし、問題を少し再構成しようとするかもしれません。「正しく処理」できないオブジェクトをどのように処理しますか?それらをキューに残しているだけの場合は、ある時点でそれらを引き出して処理する必要があります。それらを処理する方法を理解するか(通常、queue.get()が何らかの無効または不正な値を与える場合は、それをフロアにドロップするだけで大​​丈夫です)、または別のデータ構造を選択することをお勧めしますFIFO。

于 2009-11-18T23:25:21.753 に答える
0

「最も単純な」ソリューション

の要素が正常に処理されるまで、次の要素を処理しないでください。

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. peek()値がnullかどうかを呼び出して確認することは、CPU効率が良くありません。

次のプログラムのキューが空の場合、システムのCPU使用率が10%になるのを確認しました。

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. 追加sleep()すると速度が低下します。

  2. を使用してキューに戻すとputLast、順序が乱れます。また、ロックが必要なブロッキング操作です。

于 2019-09-11T13:19:49.397 に答える