2

以下のリンクで述べたように: -

キューに入れる前に ThreadPoolExecutor でスレッドを最大まで増やすにはどうすればよいですか?

要素に入った後に false を返すようにキューの実装を変更しました。その結果、新しいタスクがキューに挿入されるたびに、新しいスレッドが作成されます。

しかし、ロガーを配置して大規模な (Bi システム テスト) 以下の実装を実行すると、新しい問題が生成されます。

タスクが実行されると、そのタスクはキューに挿入され、キューが false を返すと、その実行のために新しいスレッドが作成されます。現在プールにあるアイドル スレッドは取得されません。getTask()理由は、タスクがキューからタスクを選択するメソッドからアイドル状態のスレッドに割り当てられるためです。したがって、私の質問は、スレッドがアイドル状態の場合、新しいスレッドを作成するのではなく、アイドル状態のスレッドに実行用のタスクが割り当てられるように、この動作を変更する方法です??

以下の出力により、より明確になります。

Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

コード ファイルは次のとおりです。

サーバー マシンで 1.5 を使用しており、アップグレードできないため、ThreadPoolExecutor のバージョンは Java 1.5 です。

ThreadPoolExecutor:-

 public void execute(Runnable command) {
    System.out.println("Active Count: " + getActiveCount()
            + " Pool Size : " + getPoolSize() + " Idle Count: "
            + (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
          if (command == null)
              throw new NullPointerException();
          for (;;) {
              if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
              if (status == 0) {   // failed to create thread
                  reject(command);
                  return;
              }
              // Retry if created a new thread but it is busy with another task
          }
      }

LinkedBlockingQueue:-

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public CustomBlockingQueue() {
    super(Integer.MAX_VALUE);
    }

    public boolean offer(E e) {
       return false;
    }

}

拒否ハンドラでは、オーバーライドしていないキューの put メソッドを呼び出しています

Callingexecutor

   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

 for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

コア プール サイズを 3、最大プール サイズを 8 としてエグゼキュータを呼び出し、タスクに無制限のリンク ブロッキング キューを使用しています。

4

4 に答える 4

1

あなたの問題は次のとおりだと思います。

public boolean offer(E e) {
   return false;
}

falseこれは、現在アイドル状態のスレッドの数に関係なく、常に TPE に戻り、別のスレッドを開始させます。これは、この回答のコード サンプルが推奨するものではありません。フィードバックの後、初期の問題を修正する必要がありました。

私の答えは、あなたのoffer(...)メソッドを次のようにすることです:

public boolean offer(Runnable e) {
    /*
     * Offer it to the queue if there is 1 or 0 items already queued, else
     * return false so the TPE will add another thread.
     */
    if (size() <= 1) {
        return super.offer(e);
    } else {
        return false;
    }
}

そのため、キューにすでに 2 つ以上のものが存在する場合、別のスレッドが fork されます。それ以外の場合は、アイドル状態のスレッドによって取得されるタスクがキューにキューに入れられます。値で遊ぶこともでき1ます。0またはそれ以上で試してみると1、アプリケーションに適している場合があります。CustomBlockingQueueその価値をあなたの可能性に注入することが適切です。

于 2013-10-30T22:04:19.653 に答える
1

SynchronousQueue. _ すでに待機中の受信者がいる場合にのみ、提供されたアイテムを受け入れます。したがって、アイドル状態のスレッドはアイテムを取得し、アイドル状態のスレッドがなくなると、ThreadPoolExecutor新しいスレッドが開始されます。

唯一の欠点は、すべてのスレッドが開始されると、保留中のアイテムに容量がないため、単純にキューに入れることができないことです。したがって、サブミッターがブロックされることを受け入れるか、保留中のタスクをサブミッターに配置するための別のキューと、これらの保留中のアイテムを同期キューに配置しようとする別のバックグラウンド スレッドが必要です。ほとんどの場合、この追加のスレッドはこれら 2 つのキューのいずれかでブロックされるため、パフォーマンスが低下することはありません。

class QueuingRejectionHandler implements RejectedExecutionHandler {

  final ExecutorService processPending=Executors.newSingleThreadExecutor();

  public void rejectedExecution(
      final Runnable r, final ThreadPoolExecutor executor) {
    processPending.execute(new Runnable() {
      public void run() {
        executor.execute(r);
      }
    });
  }
}

…</p>

ThreadPoolExecutor e=new ThreadPoolExecutor(
  corePoolSize, maximumPoolSize, keepAliveTime, unit,
  new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());
于 2013-10-30T12:09:56.340 に答える