-1

はい、これはJavaの生産者/消費者に関するもう1つの質問です。

私の変種は、N個のプロデューサーがproduceDataRowsメソッドによって開始され、M個のコンシューマーがconsumeDataRowsメソッドによって開始されることです。どちらのメソッドも、クラスの独自のインスタンスを開始しThreadPoolExecutor、それぞれの数のプロデューサー/コンシューマータスクを送信してから、エグゼキューターが完了するまで待機します。

だから、ここに私のコードがあります:

final BlockingQueue<Row> allRows = new LinkedBlockingQueue<Row>();
ExecutorService exec = Executors.newFixedThreadPool(2);
FutureTask<Object> producer = new FutureTask<Object>(new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    produceDataRows(allRows);
    return null;
  }
});
FutureTask<Object> consumer = new FutureTask<Object>(new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    consumeDataRows(allRows);
    return null;
  }
});
exec.execute(producer);
exec.execute(consumer);
producer.get();
consumer.get();

問題はそれconsumer.get()が戻ることですが、consumeDataRows決して呼び出されません。一方produceDataRows、はと呼ばれます。

私は何が欠けていますか?

ありがとう。

編集1

グレイの返事に続いて、私はコードを次のように書き直しました。

ExecutorService exec = Executors.newFixedThreadPool(2);
Callable<Object> producer = new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    produceDataRows(allRows);
    return null;
  }
};
Callable<Object> consumer = new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    consumeDataRows(allRows);
    return null;
  }
};
exec.submit(producer);
exec.submit(consumer);
exec.shutdown();
exec.awaitTermination(10, TimeUnit.DAYS);

同じ効果-コードは終了しますが、consumeDataRows呼び出されることはありません。ここには3つの異なるThreadPoolExecutorインスタンスがあります。1つはここにproduceDataRows、もう1consumeDataRowsつはここに、最後のインスタンスはここにあります。

ありがとう。

編集2

私のメソッドに何か問題があります。produceDataRowsコメントアウトすると、実行callによって両方の呼び出し可能オブジェクトのメソッドにアクセスするためです。今それを理解しようとしています。

4

2 に答える 2

3

編集:

ここでは競合状態になっていると思います。プロデューサーとコンシューマーの両方が同時に実行されているallRowsので、プロデューサーはまだ何もしていないので、コンシューマーはどの行が入っているかを確認し、何も取得しないに違いありません。

allRows.poll(long timeout, TimeUnit unit)事前にプロデューサーを実行するか、プロデューサーの結果を待つためにコンシューマーに使用させる必要があります。たぶん、プロデューサーは、volatile boolean producedAll = trueすべての行を生成して完了したら、を設定し、コンシューマーが真になるのを.poll(...)待ってループproducedAllすることができます。


クラスが少し混乱しています。をインスタンス化して実行しないように要求する必要がありexec.submit(...)ます。 次に、呼び出すことができるを返します。Callable FutureTasksubmit(...)Futureget()

あなたが今していることは、あなた自身をインスタンス化することです。これは、発信者によって決してFutureTask内部的に行われることはありません。ExecutorServiceそれからあなたがあなたを呼んget()でいるときFutureTask、それはただすぐに戻って、参加も何もしていません。FutureTaskであるため、(残念ながら)機能しRunnableます。

あなたのコードは次のようなことをしているはずです:

Future<Object> producer = exec.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
       produceDataRows(allRows);
       return null;
    }
});
Future<Object> consumer = exec.submit(new Callable<Object>() {
    ...
});
producer.get();
consumer.get();

ところで、私はただ欲しいので、通常Callable<Void>、からのリターンを気にしない場合に使用します。私はまだ戻りますが、値のないオブジェクトが必要な場合に役立ちます。CallableExceptionnullVoid

于 2012-08-07T16:14:43.410 に答える
-1

これをmainメソッド内から実行しているようで、thread他の2つのスレッドが作業を終了する前にメインダイを実行します。

joiningメインスレッドが他のスレッドの完了を待つように、他のスレッドにスレッドを提案します。

于 2012-08-07T16:33:18.107 に答える