19

バックグラウンド

学校にお金がないので、私は料金所で夜勤をしていて、インターネットを使ってコーディングスキルを学び、明日の仕事や自分が作ったアプリのオンライン販売を望んでいます。長い夜、少数の顧客。

マルチスレッドを使用する文献(Android SDKなど)で多くのコードに遭遇したため、トピックとしてマルチスレッドに取り組んでいますが、それでもあいまいです。

精神

この時点での私のアプローチは次のとおりです。考えられる最も基本的なマルチスレッドの例をコーディングし、壁に頭をぶつけて、脳を伸ばして新しい考え方に対応できるかどうかを確認します。私は自分の限界にさらされて、うまくいけば限界を超えようとしています。自由に批判し、私がやろうとしていることを行うためのより良い方法を指摘してください。

目的

  • Get some advice on how to do the above, based on my efforts so far (code provided)

エクササイズ

私が定義するスコープは次のとおりです。

意味

データオブジェクトの生成とその消費に連携して機能する2つのクラスを作成します。一方のスレッドはオブジェクトを作成し、もう一方のスレッドが取得して消費できるようにそれらを共有スペースに配信します。Producer生成スレッド、消費スレッドConsumer、共有スペースと呼びましょうSharedSpace。他の人が消費するためにオブジェクトを作成するという行為は、このシナリオとの類似性によって同化することができます。

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

運動を簡単にするために、私は子供がケーキを食べるときに母親が料理をすることを許可しないことにしました。彼女は子供がケーキを完成させるのを待つだけで、良い子育てのために、ある限界まで、即座に別のケーキを作ります。演習の本質は、並行性を実現することよりもスレッドのシグナリングを練習することです。それどころか、私は完全なシリアル化に焦点を合わせており、ポーリングや「まだ行けますか?」はありません。チェックします。次に、母と子が並行して「働く」という後続の演習をコーディングする必要があると思います。

アプローチ

  • クラスにRunnableインターフェイスを実装してもらい、独自のコードエントリポイントを設定します。

  • プログラムのエントリポイント からインスタンス化および開始されるThreadオブジェクトへのコンストラクター引数としてクラスを使用しますmain

  • Thread.join()を使用して、スレッドmainが終了する前にプログラムが終了しないことを確認します。

  • Producerのデータを作成する回数に制限を設定しますConsumer

  • データ生成の終了を通知するために使用する番兵の値について合意しますProduce

  • ワーカースレッドの最終的なサインオフを含む、共有リソースおよびデータの生成/消費イベントのロックのログ取得

  • プログラムから単一のSharedSpaceオブジェクトを作成し、main開始する前にそれを各ワーカーに渡します

  • オブジェクトへのprivate参照SharedSpaceを各ワーカーの内部に保存します

  • Consumerデータが生成される前に消費する準備ができている状態を説明するためのガードとメッセージを提供します

  • Producer指定された回数の反復後に停止します

  • Consumer番兵の値を読み取った後、停止します

コード


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

実行ログ


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

質問

  • 上記は正しいですか?(たとえば、正しい言語ツール、正しいアプローチを使用しているか、愚かなコードが含まれているかなど)

しかし、それは「正しく見える」のでしょうか?

「もう一方」ではなく「一度」のテストで何回問題が発生したか想像できないため、出力が「見栄えが良い」場合でも正確性について質問します(たとえば、コンシューマーが最初に開始したとき、プロデューサーが終了しなかったときなど)。歩哨などを作成した後)。私は「成功した実行」から正しさを主張しないことを学びました。それどころか、私は疑似並列コードに非常に疑いを持っています!(これは定義上並列ではありません!0

拡張された回答

良い質問は(上記のもの)だけに焦点を当てていone requested piece of adviceますが、必要に応じて、回答の中で次の他のトピックへの洞察を自由に述べてください。

  • 次の試行をコーディングするときに、並列コードをテストするにはどうすればよいですか?

  • 開発とデバッグの両方で役立つツールはどれですか?Eclipseを使用することを検討してください

  • 生産を継続することを許可した場合、アプローチは変更されますか?Producer各生産にはさまざまな時間がかかりますが、Consumer利用可能になるものはすべて消費されますか?ロックは他の場所に移動する必要がありますか?シグナリングは、この待機/通知パラダイムから変更する必要がありますか?

  • 物事を行うこの方法は時代遅れですか、そして私はむしろ何か他のものを学ぶべきですか?この料金所からは、「Javaの現実の世界」で何が起こっているのかわかりません。

次のステップ

  • ここからどこへ行けばいいの?「未来」の概念がどこかで言及されているのを見たことがありますが、トピックの番号付きリストを使用して、関連する学習リソースへのリンクを使用して、順番に、教育的に順序付けて作業することができます

ティノシノ

4

4 に答える 4

6

上記は正しいですか?

私が見る唯一の問題は、@Tudorと@Bhaskarによって言及されたものです。条件を待っているときに条件をテストするときは常に、ループを使用する必要があります。whileただし、これは複数のプロデューサーとコンシューマーとの競合状態に関するものです。誤ったウェイクアップが発生する可能性がありますが、競合状態が発生する可能性がはるかに高くなります。主題に関する私のページを参照してください。

はい、プロデューサーとコンシューマーは1つしかありませんが、コードを複数のコンシューマーに拡張したり、コードを別のシナリオにコピーしたりすることができます。

私は「成功した実行」から正しさを主張しないことを学びました。それどころか、私は疑似並列コードに非常に疑いを持っています!

良い本能。

次の試行をコーディングするときに、並列コードをテストするにはどうすればよいですか?

これはとても難しいです。スケールアップするのは1つの方法です。複数のプロデューサーとコンシューマーを追加して、問題があるかどうかを確認します。プロセッサの数/タイプが異なる複数のアーキテクチャで実行されます。あなたの最善の防御はコードの正確さです。緊密な同期、、、などのクラスの適切な使用によりBlockingQueueExecutorServiceクローズをよりシンプル/クリーンにします。

簡単な答えはありません。マルチスレッドコードのテストは非常に困難です。

開発とデバッグの両方で役立つツールはどれですか?

一般的なことに関しては、 Emmaのようなカバレッジツールを調べて、単体テストがすべてのコードをカバーしていることを確認できるようにします。

マルチスレッドコードのテストに関しては、スレッドダンプの読み取り方法を理解しkill -QUIT、Jconsole内で実行されているスレッドを確認してください。YourKitのようなJavaプロファイラーも役立つかもしれません。

プロデューサーに制作を継続させた場合、アプローチは変わりますか?各制作にはさまざまな時間がかかります...

私はそうは思わない。消費者はプロデューサーを永遠に待ちます。多分私はこの質問を理解していませんか?

物事を行うこの方法は時代遅れですか、そして私はむしろ何か他のものを学ぶべきですか?この料金所からは、「Javaの現実の世界」で何が起こっているのかわかりません。

ExecutorService次はクラスについて学びます。これらは、スタイルコードの大部分を処理しnew Thread()ます。特に、スレッドで実行されている多数の非同期タスクを処理している場合はそうです。これがチュートリアルです。

ここからどこへ行けばいいの?

繰り返しますが、ExecutorServiceこの開始ドキュメントを読んだことを前提としています。@Bhaskarが述べたように、Java ConcurrencyinPracticeは良い聖書です。


コードに関する一般的なコメントは次のとおりです。

  • SharedSpaceandクラスは、これThreadedを行うための工夫された方法のようです。基本クラスなどで遊んでいる場合は問題ありません。しかし、一般的に、私はこのようなパターンを使用することはありません。プロデューサーとコンシューマーは通常、BlockingQueuelikeLinkedBlockingQueueを使用して作業します。この場合、同期とvolatileペイロードが自動的に処理されます。また、基本クラスから取得するのではなく、共有情報をオブジェクトコンストラクターに挿入する傾向があります。

  • 通常、私が使用している場合synchronized、それはprivate finalフィールド上にあります。private final Object lockObject = new Object();すでにオブジェクトを操作していない限り、ロック用にを作成することがよくあります。

  • 巨大なsynchronizedブロックに注意し、セクション内にログメッセージを配置しsynchronizedます。ログは通常synchronized、ファイルシステムに対してIOを実行しますが、これは非常にコストがかかる可能性があります。synchronized可能であれば、小さくて非常にタイトなブロックを用意する必要があります。

  • consumedDataループの外側で定義します。割り当ての時点でそれを定義し、それがでbreakある場合はループから救済するためにaを使用します== -1。可能であれば、ローカル変数のスコープを制限してください。

  • ロギングメッセージがコードのパフォーマンスを支配します。これは、それらを削除すると、コードのパフォーマンスが完全に異なることを意味します。これは、問題をデバッグするときに理解することが非常に重要です。CPU /コアが異なる別のアーキテクチャに移行すると、パフォーマンスも(ほとんどの場合)変化します。

  • あなたはおそらくこれを知っていますが、あなたが呼び出すとき、それはそれが現在にある場合sharedSpace.notify();に別のスレッドが通知されることを意味するだけです。それが他のものでない場合、それは通知を見逃します。参考までに。sharedSpace.wait();

  • を実行するのは少し奇妙ですがif (nIterations <= N_ITERATIONS)、その下の3行elseでもう一度実行します。を複製するnotify()と、分岐が単純化されます。

  • int nIterations = 0;次にwhile、++内にthenがあります。これがforループのレシピです。

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

これがあなたのコードのはるかにタイトなバージョンです。これは私がそれを書く方法のほんの一例です。繰り返しになりますが、欠落していることを除けばwhile、バージョンに問題はないようです。

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}
于 2012-09-25T20:51:45.603 に答える
4

あなたはここでかなり良い仕事をしたようです。実際につまむことはあまりありません。私がお勧めしたいのは、バッファオブジェクト自体の同期を避けるべきだということです。この場合は問題ありませんが、代わりにデータ構造バッファに切り替えると、クラスによっては内部で同期される可能性があります(たとえばVector、現在は廃止されています)。そのため、外部からロックを取得すると、混乱する可能性があります。

編集:Bhaskarは、へのwhile呼び出しをラップするためにを使用することについて良い点を述べていwaitます。これは、悪名高いスプリアスウェイクアップが発生し、スレッドがwait時期尚早に抜けることを余儀なくされるためです。そのため、スレッドが元に戻ることを確認する必要があります。

次にできることは、有限バッファープロデューサーコンシューマーを実装することです。リンクリストなどの共有データ構造を持ち、最大サイズ(10アイテムなど)を設定します。次に、プロデューサーに生産を継続させ、キューに10個のアイテムがある場合にのみ一時停止します。バッファが空になると、コンシューマは一時停止されます。

次のステップは、手動で実装したプロセスを自動化する方法を学ぶことです。ブロッキング動作を備えたバッファーを提供するものを見てくださいBlockingQueue(つまり、バッファーが空の場合はコンシューマーが自動的にブロックし、バッファーがいっぱいの場合はプロデューサーがブロックします)。

また、状況によっては、エグゼキュータ(を参照ExecutorService)は、タスクキューと1つ以上のワーカー(コンシューマー)をカプセル化するため、必要なのはプロデューサーだけであるため、価値のある代替となる可能性があります。

于 2012-09-23T17:34:00.183 に答える
0

ProducersとConsumersは、実装する単純なクラスにすることができますRunnable(no extends Threaded)。そうすれば、脆弱性が低くなります。クライアントは自分でテーマを作成Threadしてインスタンスをアタッチできるため、クラス階層のオーバーヘッドは必要ありません。

あなたの前のあなたの状態はではなくであるwait()必要があります。while()if

編集:JCIPページ301から:

void stateDependentMethod() throws InterruptedException {
      // condition predicate must be guarded by lock
      synchronized(lock) {
          while (!conditionPredicate())
            lock.wait();
          // object is now in desired state
       }
  }

静的に停止する条件が組み込まれています。通常、プロデューサーとコンシューマーはより柔軟である必要があります-停止するための外部信号に応答できる必要があります。

手始めに、外部停止信号を実装するために、フラグがあります:

class Producer implements Runnable { 
     private volatile boolean stopRequested ;

     public void run() {
        while(true){
           if(stopRequested )
                // get out of the loop
         }
     }

     public void stop(){
        stopRequested  = true;
        // arrange to  interrupt the Producer thread here.
     }
 }

上記を実装しようとすると、他にも問題が発生することがわかります。たとえば、プロデューサーが最初に公開してからwait()ingを実行しますが、問題が発生する可能性があります。

さらに読むことに興味がある場合は、本を読むことをお勧めします-JavaConcurrencyInPractice。これには、ここで追加できるよりも多くの推奨事項があります。

于 2012-09-23T17:49:47.120 に答える
0

野心のための巨大な称賛!あなたはほぼ8年前にこの質問をしました。あなたの努力があなたにあなたが望む教育を提供した(そしてあなたに提供し続ける)ことを願っています。

最近wait()ではnotify()join()Javaでマルチスレッドを実装することは強くお勧めしません。この低レベルで並行性を制御しようとすると、足を踏み入れるのは簡単すぎます(実際、Java設計者は、の多くのメソッドとセマンティクスThreadが実際には設計ミスであったことを認めていますが、下位互換性のためにそれらを残しておく必要があります- -多くの人が新しい「仮想スレッド」(Project Loom)を廃止します-しかし、それは別のトピックです)。

今日、スレッドを手動で起動および制御するための推奨される方法は、を介してExecutorService.submit(Callable<V>)、を返すことFuture<V>です。次に、を呼び出してスレッドが終了するのを待つ(そして戻り値を取得する)ことができます。呼び出し可能オブジェクトによって返されたFuture<V>.get()型の値を返しますV(または、キャッチされない例外がスローされたExecutionException場合はスローします)。Callable

次のクラスは、このようなものを実装する方法の例です。これにより、単一の制限付きブロッキングキューを介して、任意の数のプロデューサーが任意の数のコンシューマーに接続されます。(スレッドからの戻り値は無視されるため、ExecutorService.submit(Runnable)と呼ばれ、Future<?>ではなく、を返しますExecutorService.submit(Callable<V>))。

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class ProducerConsumer<E> {

    private final BlockingQueue<Optional<E>> queue;

    public ProducerConsumer(
            int numProducerThreads, int numConsumerThreads, int queueCapacity) {
        if (numProducerThreads < 1 || numConsumerThreads < 1 || queueCapacity < 1) {
            throw new IllegalArgumentException();
        }
        queue = new ArrayBlockingQueue<Optional<E>>(queueCapacity);
        final ExecutorService executor = 
                Executors.newFixedThreadPool(numProducerThreads + numConsumerThreads);
        try {
            // Start producer threads
            final List<Future<?>> producerFutures = new ArrayList<>();
            final AtomicInteger numLiveProducers = new AtomicInteger();
            for (int i = 0; i < numProducerThreads; i++) {
                producerFutures.add(executor.submit(() -> {
                    numLiveProducers.incrementAndGet();
                    // Run producer
                    producer();
                    // When last producer finishes, deliver poison pills to consumers
                    if (numLiveProducers.decrementAndGet() == 0) {
                        for (int j = 0; j < numConsumerThreads; j++) {
                            queue.put(Optional.empty());
                        }
                    }
                    return null;
                }));
            }
            // Start consumer threads
            final List<Future<?>> consumerFutures = new ArrayList<>();
            for (int i = 0; i < numConsumerThreads; i++) {
                consumerFutures.add(executor.submit(() -> {
                    // Run Consumer
                    consumer();
                    return null;
                }));
            }
            // Wait for all producers to complete
            completionBarrier(producerFutures, false);
            // Shut down any consumers that are still running after producers complete
            completionBarrier(consumerFutures, false);
        } finally {
            executor.shutdownNow();
        }
    }

    private static void completionBarrier(List<Future<?>> futures, boolean cancel) {
        for (Future<?> future : futures) {
            try {
                if (cancel) {
                    future.cancel(true);
                }
                future.get();
            } catch (CancellationException | InterruptedException e) {
                // Ignore
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void produce(E val) {
        try {
            queue.put(Optional.of(val));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected Optional<E> consume() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /** Producer loop. Call {@link #produce(E)} for each element. */
    public abstract void producer();

    /**
     * Consumer thread. Call {@link #consume()} to get each successive element,
     * until an empty {@link Optional} is returned.
     */
    public abstract void consumer();
}

次のように使用します。

new ProducerConsumer<Integer>(/* numProducerThreads = */ 1, /* numConsumerThreads = */ 4,
        /* queueCapacity = */ 10) {
    @Override
    public void producer() {
        for (int i = 0; i < 100; i++) {
            System.out.println("Producing " + i);
            produce(i);
        }
    }

    @Override
    public void consumer() {
        for (Optional<Integer> opt; (opt = consume()).isPresent; ) {
            int i = opt.get();
            System.out.println("Got " + i);
        }
    }
};
于 2020-07-09T10:12:51.090 に答える