1

次のマルチスレッドプログラムを作成しました。スレッドの1つがfalseを返す場合、すべてのスレッドをキャンセルしたい。ただし、個々のタスクをキャンセルしてスレッドをキャンセルしていますが。動いていない。スレッドをキャンセルするには、どのような変更を加える必要がありますか?

次のマルチスレッドプログラムを作成しました。スレッドの1つがfalseを返す場合、すべてのスレッドをキャンセルしたい。ただし、個々のタスクをキャンセルしてスレッドをキャンセルしていますが。動いていない。スレッドをキャンセルするには、どのような変更を加える必要がありますか?

import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;

public class BeamWorkerThread implements Callable<Boolean> {


  private List<BeamData> beamData;
  private String threadId;



 public BeamScallopingWorkerThread(
    List<BeamData> beamData, String threadId) {
   super();
   this.beamData = beamData;
   this.threadId = threadId;
  }





 @Override
  public Boolean call() throws Exception {

  Boolean result = true;

  DataValidator validator = new DataValidator();
   Iterator<BeamScallopingData> it = beamData.iterator();

  BeamData data = null;
  while(it.hasNext()){

   data = it.next();

   if(!validator.validateDensity(data.getBin_ll_lat(), data.getBin_ll_lon(), data.getBin_ur_lat(), data.getBin_ur_lon())){
     result = false;
     break;
    }
   }
   return result;
  }

}









   ExecutorService threadPool = Executors.newFixedThreadPool(100);
        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>();

        long count = 0;
         final long RowLimt = 10000;
         long threadCount = 1;

        while ((beamData = csvReader.read(
           BeamData.class, headers1, processors)) != null) {

         if (count == 0) {
           beamDataList = new ArrayList<BeamData>();
          }

         beamDataList.add(beamData);
          count++;
          if (count == RowLimt) {
           results.add(threadPool
             .submit(new BeamWorkerThread(
               beamDataList, "thread:"
                 + (threadCount++))));
           count = 0;
          }
         }

        results.add(threadPool.submit(new BeamWorkerThread(
           beamDataList, "thread:" + (threadCount++))));
         System.out.println("Number of threads" + threadCount);
         for (Future<Boolean> fs : results)
          try {


           if(fs.get() == false){
            System.out.println("Thread is false");
            for(Future<Boolean> fs1 : results){
             fs1.cancel(true);
            }
           }
          } catch(CancellationException e){

         } catch (InterruptedException e) {

         } catch (ExecutionException e) {

         } finally {
           threadPool.shutdownNow();
          }

       }

私のコメント

ご意見をお寄せいただきありがとうございます。反応に圧倒されています。私は、よく実装されたスレッドがアプリを最高に引き上げ、悪い実装がアプリをひざまずかせることを知っています. 私は空想的なアイデアを持っていることに同意しますが、他に選択肢はありません。私は1000万以上の記録を持っているので、メモリの制約と時間の制約があります。両方に取り組む必要があります。したがって、データ全体を飲み込むのではなく、チャンクに分割しています。また、1 つのデータが無効な場合、残りの 100 万個のデータの処理に時間を無駄にしたくありません。@Mark Petersの提案はオプションだと思います。それに応じて変更を加えたということは、タスクを中断するフラグを追加したことを意味し、将来のリストがどのように機能するかについてかなり混乱しています。私が理解しているのは、すべてのスレッドがその値を返すと、将来のリストの各フィールドのループが開始されるということです。その場合、メイン リストからすべてのタスクを途中でキャンセルする方法はありません。オブジェクトの参照を各スレッドに渡す必要があります。1 つのスレッドがスレッド参照を使用して無効なデータを見つけた場合、各スレッドのキャンセル メソッドを呼び出して、割り込みフラグを設定します。

while(it.hasNext() && !cancelled) {
         if(!validate){
               // loop through each thread reference and call Cancel method
         }
      }
4

3 に答える 3

7

残りのすべてのタスクをキャンセルしようとしても、コードが中断可能になるように慎重に記述されていないと失敗します。それが正確に必要とすることは、1 つの StackOverflow の回答だけではありません。いくつかのガイドライン:

  • 飲み込まないでくださいInterruptedException。その発生をタスクを中断させます。
  • コードが割り込み可能なメソッド内で多くの時間を費やさない場合は、明示的なThread.interrupted()チェックを挿入して適切に対応する必要があります。

中断可能なコードを書くことは、一般的に初心者向けのものではないので、気をつけてください。

于 2013-08-13T18:58:52.337 に答える
2

をキャンセルしても、Future実行中のコードは中断されません。これは主に、タスクが最初から実行されるのを防ぐのに役立ちます。

trueタスクを実行しているスレッドを中断するパラメーターとして を指定できますが、これは、 をスローするコードでスレッドがブロックされている場合にのみ効果がありInterruptedExceptionます。それ以外interruptedに、スレッドのステータスを暗黙的にチェックするものはありません。

あなたの場合、ブロッキングはありません。時間のかかる忙しい仕事です。1 つのオプションは、ループの各段階でチェックする volatile ブール値を持つことです。

public class BeamWorkerThread implements Callable<Boolean> {

   private volatile boolean cancelled = false;
   @Override
   public Boolean call() throws Exception {
      //...
      while(it.hasNext() && !cancelled) {
         //...
      }
   }

   public void cancel() {
       cancelled = true;
   }
}

次に、オブジェクトへの参照を保持し、それをBeamWorkerThread呼び出しcancel()てその実行を先取りします。

なぜ私は割り込みが好きではないのですか?

Marko は、cancelled上記のフラグは本質的に再発明であると述べThread.interrupted()ました。正当な批判です。これが、このシナリオで割り込みを使用しないことを好む理由です。

1. 特定のスレッド構成に依存します。

タスクがエグゼキュータに送信できる、または直接呼び出すことができるキャンセル可能なコードを表している場合、一般的なケースThread.interrupt()で実行をキャンセルするために使用すると、割り込みを受け取るコードは、タスクを完全にキャンセルする方法を知っているコードであると 想定されます。

この場合はそれが当てはまるかもしれませんが、キャンセルとタスクの両方が内部でどのように機能するかを知っているからこそ、それがわかります。しかし、次のようなものがあると想像してください。

  • タスクは仕事をします
  • リスナーは、その最初の作業についてスレッド上で通知されます
    • 最初のリスナーは、次を使用してタスクをキャンセルすることを決定しますThread.interrupt()
    • 2 番目のリスナーは、中断可能な作業を行い、中断されます。ログに記録しますが、それ以外の場合は割り込みを無視します。
  • タスクは割り込みを受けず、タスクはキャンセルされません。

というか、グローバルinterrupt()すぎる仕組みだと思います。他の共有グローバル状態と同様に、すべてのアクターについて仮定します。それが、実行コンテキストに関する詳細を公開/結合するということです。そのタスク インスタンスにのみ適用可能なメソッドにカプセル化することで、そのグローバルな状態を排除します。interrupt()cancel()

2. 常に選択できるわけではありません。

ここでの典型的な例はInputStream. からの読み取りをブロックするタスクがある場合、 はブロックInputStreaminterrupt()解除するために何もしません。ブロックを解除する唯一の方法は、ストリームを手動で閉じることですcancel()。これは、タスク自体のメソッドで行うのが最適です。実装に関係なく、タスクをキャンセルする方法が 1 つ (例: Cancellable) あることは、私には理想的です。

于 2013-08-13T18:59:20.760 に答える