11

私は自分の処理パイプラインを実装するための最良の方法に取り組んでいます。

私のプロデューサーは、BlockingQueueに作業をフィードします。コンシューマー側では、キューをポーリングし、Runnableタスクで取得したものをラップして、ExecutorServiceに送信します。

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

これは理想的ではありません。もちろん、ExecutorServiceには独自のキューがあるので、実際に起こっていることは、常に作業キューを完全に使い果たし、タスクキューをいっぱいにしていることです。タスクキューは、タスクが完了するとゆっくりと空になります。

プロデューサー側でタスクをキューに入れることができることはわかっていますが、実際にはそうはしません。作業キューの間接参照/分離がダム文字列であることが好きです。彼らに何が起こるかは、実際にはプロデューサーの仕事ではありません。プロデューサーにRunnableまたはCallableをキューに入れるように強制すると、抽象化、IMHOが壊れます。

ただし、共有ワークキューで現在の処理状態を表す必要があります。消費者が追いついていない場合は、生産者をブロックできるようにしたいと思います。

エグゼキュータを使いたいのですが、エグゼキュータのデザインと戦っているような気がします。Kool-adeを部分的に飲むことはできますか、それとも飲み込む必要がありますか?私はキューイングタスクに抵抗することに頭を悩ませていますか?(1タスクキューを使用するようにThreadPoolExecutorを設定し、そのexecuteメソッドをオーバーライドして、reject-on-queue-fullではなくブロックすることができると思いますが、それはひどい感じです。)

提案?

4

3 に答える 3

22

共有作業キューが現在の処理状態を表すようにしたい。

共有BlockingQueueを使用してみてください 。Worker スレッドのプールがキューから作業項目を取り出します。

消費者が追いついていない場合、生産者をブロックできるようにしたい。

ArrayBlockingQueueLinkedBlockingQueueは両方とも、キューがいっぱいになると書き込み時にブロックされるように、制限付きキューをサポートします。ブロッキングput()メソッドを使用すると、キューがいっぱいになった場合にプロデューサーが確実にブロックされます。

ここから大まかなスタートです。ワーカーの数とキューのサイズを調整できます。

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
于 2010-02-10T01:42:07.890 に答える
1

「利用可能な既存のワーカー スレッドが存在する場合はそれを見つけ、必要に応じて作成し、アイドル状態になった場合はそれらを強制終了します。」

これらすべてのワーカー状態を管理することは、危険であると同時に不必要です。バックグラウンドで常に実行される監視スレッドを 1 つ作成します。そのタスクは、キューをいっぱいにしてコンシューマを生成することだけです...ワーカー スレッドをデーモンにして、完了するとすぐに終了するようにしてみませんか? それらすべてを 1 つの ThreadGroup にアタッチすると、プールのサイズを動的に変更できます...例:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**
于 2010-10-25T17:53:42.917 に答える
0

Runnable::run新しいスレッドを開始する代わりに、消費者に直接実行させることができます。これを最大サイズのブロッキング キューと組み合わせると、必要なものが得られると思います。コンシューマーは、キューの作業項目に基づいてタスクをインラインで実行するワーカーになります。アイテムを処理するのと同じ速さでのみデキューするため、コンシューマーが消費を停止するとプロデューサーになります。

于 2010-02-10T01:56:49.397 に答える