7

だから私はプロデューサーの消費者の問題をシミュレートしました。以下のコードがあります。私の質問はこれです: 彼が一定の while(true) にいる場合、消費者はどのように停止しますか。

以下のコードでは、追加しました

                    if (queue.peek()==null)
                         Thread.currentThread().interrupt();

この例ではうまく機能します。しかし、私の現実世界の設計では、これは機能しません (プロデューサーがデータを「プット」するのに時間がかかることがあるため、コンシューマーでスローされた例外は正しくありません。一般に、「毒」データを入れることができることはわかっています)。 Object is XYZ and I can check it in consumer. しかし、この毒はコードの見栄えを悪くします. 誰かが別のアプローチを持っているのだろうか.

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}

}

4

5 に答える 5

10

A:peek返品したからnullといって、生産者が生産を停止したという保証はありません。プロデューサーが単に減速した場合はどうなりますか?今、消費者はやめ、生産者は生産を続けます。したがって、「ピーク」->「ブレーク」のアイデアは基本的に失敗します。

B:次の場合、コンシューマーからの「完了/実行」フラグの設定とプロデューサーでの読み取りも失敗します。

  1. コンシューマーはフラグをチェックし、実行を継続する必要があることを確認してから、「テイク」を実行します
  2. その間、プロデューサーはフラグを「実行しない」に設定していました
  3. 今、消費者はゴーストパケットを待って永遠にブロックします

逆の場合も発生する可能性があり、1つのパケットが消費されずに除外されます。

次に、これを回避するために、「BlockingQueue」に加えてミューテックスとの追加の同期を行う必要があります。

C: 次のような状況では、「ロゼッタコード」が良い習慣を決定するための優れた情報源であることがわかります。

http://rosettacode.org/wiki/Synchronous_concurrency#Java

プロデューサーとコンシューマーは、入力の終わりを表すオブジェクト(またはオブジェクト内の属性)について合意する必要があります。次に、プロデューサーは最後のパケットにその属性を設定し、コンシューマーはそれを消費しなくなります。つまり、あなたがあなたの質問で「毒」と呼んだもの。

上記のRosettaコードの例では、この「オブジェクト」は単にString「EOF」と呼ばれる空です。

final String EOF = new String();

// Producer
while ((line = br.readLine()) != null)
  queue.put(line);
br.close();
// signal end of input
queue.put(EOF);

// Consumer
while (true)
  {
    try
      {
        String line = queue.take();
        // Reference equality
        if (line == EOF)
          break;
        System.out.println(line);
        linesWrote++;
      }
    catch (InterruptedException ie)
      {
      }
  }
于 2012-04-27T15:21:12.383 に答える
3

スレッドで割り込みを使用しないでください。不要になったときにループを中断します。

if (queue.peek()==null)
         break;

または、変数を使用してクローズ操作の保留をマークし、ループを中断してループを閉じることもできます。

if (queue.peek()==null)
         closing = true;

//Do further operations ...
if(closing)
  break;
于 2012-04-27T14:51:46.003 に答える
0

現実の世界では、ほとんどのメッセージングには、メッセージタイプ/サブタイプまたはおそらく異なるオブジェクトを定義するある種のヘッダーが付属しています。

メッセージを受け取ったときにスレッドに何かを実行するように指示するコマンドアンドコントロールオブジェクトまたはメッセージタイプを作成できます(シャットダウン、テーブルのリロード、新しいリスナーの追加など)。

このようにして、コマンドアンドコントロールスレッドが通常のメッセージフローにメッセージを送信するだけであると言うことができます。大規模システムなどでCNCスレッドを操作端末と通信させることができます。

于 2012-04-27T15:04:15.513 に答える
0

コンシューマーを終了させる前にキューを空にすることができる場合は、いつ停止するかをスレッドに伝えるフラグが必要になります。セッター メソッドを追加して、プロデューサーがコンシューマーにシャットダウンを指示できるようにします。次に、代わりに次のようにコードを変更します。

if (queue.isEmpty())
   break;

あなたのコードをチェックしてください

if (!run)
{
   break;
}
else if (queue.isEmpty())
{
   Thread.sleep(200);
   continue;
}
于 2012-04-27T15:08:56.737 に答える
0

このタイプセーフ パターンをポイズン ピルで使用できます。

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}
于 2020-10-03T05:35:11.297 に答える