0

この場合(任意のバージョン)、プロデューサーが「永久に」実行する必要があるExecutorServiceを使用する単一のプロデューサー複数のコンシューマーを実装するためのJavaでの最良のアプローチを見つけようとしています(必要ではありません)。すべてが完全に処理されるまで待機する必要があります。たとえば、すべてのコンシューマー スレッドが終了し、キューが空になり、生成されるアイテムが残っていません。プロデューサは、データ ソースも一定の間隔でのみポーリングする必要があります。

例として、30 分ごとにプロデューサーがレコードのデータ ソースをポーリングし、それらをキューに送信するようにします。コンシューマーが処理に 30 分以上かかっている場合は、データ ソースを再度ポーリングする前に、すべてのアイテムが処理されるまでプロデューサーに待機してもらいます (30 分が経過するとすぐにポーリングします)。

私のコードを書いてくれる人を探しているわけではありません。いくつかの基本的なヒント/ガイダンスをいただければ幸いです。

これは、私が作業しようとしている短縮された実装例です。私は問題を解決するための恐ろしい試みをすべてやめました。ThreadPoolExecutor を構築するためにハードコードされたパラメーターは、最終的に動的になることに注意してください。

import java.util.concurrent.*;
public class ItemProcessorService {
    public static void main(String args[]) throws InterruptedException {
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        int corePoolSize = 5,
                maxPoolSize = 10,
                keepAlive = 10,
                queueCapacity = 1;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);

        for (int i = 0; i < 10; i++) {
            executor.execute(new ItemConsumer());
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Executor finished");
    }
}

class ItemConsumer implements Runnable {
    @Override
    public void run() {
        processItem();
    }

    private void processItem() {
        try {
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " - processed item");
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " - rejected");
    }
}
4

3 に答える 3

1

将来のタスクでスレッド プール エグゼキュータに移動します。すべてのスレッドが完了する前に、Poducer を待機させます。以下に例を示します。

 for (int job = 0; job < yourList.size(); job++) {
        Future<String[]> future = Service.getCompletionService().take();

        if (future.isDone()) {
        }
    }
于 2014-08-22T21:08:12.200 に答える
0

CountDownLatch と組み合わせたセルフスケジューリング プロデューサー (こちらも参照) を使用します。コードが必要ないことはわかっていますが、この場合、実験するコードを用意しても問題はないと思います。

import java.util.*;
import java.util.concurrent.*;

public class WaitForConsumers {

public static void main(String[] args) {

    try {
        new WaitForConsumers().demo();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static final int NUMBER_OF_SLEEP_TASKS = 100;

public void demo() throws Exception {

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    Producer p = new Producer(scheduler, 1000L);
    p.start();
    try {
        Thread.sleep(3000);
        p.stop();
        System.out.println("Stopped.");
    } finally {
        scheduler.shutdownNow();
    }
}

public static List<Long> produce() {
    List<Long> items = new ArrayList<Long>();
    for (int i = 0; i < NUMBER_OF_SLEEP_TASKS; i++) items.add(i * 1L);
    return items;
}

public static void consume(List<Runnable> tasks, CountDownLatch consumersRunning) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
    for (Runnable r : tasks) executor.execute(r);
    try {
        consumersRunning.await();
    } finally {
        executor.shutdownNow();
    }
}

class Producer implements Runnable {

    ScheduledExecutorService scheduler;
    long frequencyMs;
    volatile boolean stop;
    ScheduledFuture<?> producerTask;

    Producer(ScheduledExecutorService scheduler, long frequencyMs) {
        this.scheduler = scheduler;
        this.frequencyMs = frequencyMs;
    }

    void start() {
        scheduler.execute(this);
    }

    void stop() {

        System.out.println("Stopping producer.");
        stop = true;
        if (producerTask != null) {
            producerTask.cancel(true);
        }
    }

    @Override public void run() {

        long startTime = System.currentTimeMillis();
        List<Long> items = produce();
        CountDownLatch consumersRunning = new CountDownLatch(items.size());
        List<Runnable> tasks = wrap(items, consumersRunning);
        try {
            System.out.println("Consuming " + tasks.size() + " tasks.");
            consume(tasks, consumersRunning);
            System.out.println("Consumed tasks.");
        } catch (Exception e) {
            e.printStackTrace();
            stop = true;
        } finally {
            if (stop) {
                System.out.println("Producer stopping.");
            } else {
                long waitTime = frequencyMs - (System.currentTimeMillis() - startTime);
                if (waitTime < 1L) {
                    scheduler.execute(this);
                } else {
                    System.out.println("Next run in " + waitTime + " ms.");
                    producerTask = scheduler.schedule(this, waitTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    List<Runnable> wrap(List<Long> items, final CountDownLatch consumersRunning) {

        List<Runnable> tasks = new ArrayList<Runnable>();
        for (Long l : items) {
            tasks.add(new SleepTask(l, consumersRunning));
        }
        return tasks;
    }
} // class Producer

class SleepTask implements Runnable {

    long sleepTime;
    CountDownLatch consumersRunning;

    public SleepTask(long sleepTime, CountDownLatch consumersRunning) {
        this.sleepTime = sleepTime;
        this.consumersRunning = consumersRunning;
    }

    @Override public void run() {

        try {
            Thread.sleep(sleepTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumersRunning.countDown();
        }
    }
} // class SleepTask
}
于 2014-08-22T21:56:39.733 に答える
0

ExecutorServiceプロデューサーの for every 実行を作成することで、この問題をうまく解決できます。プロデューサはそれを作成し、シャットダウンし、その終了も待ちます。

プロデューサーをスケジュールするには、必要に応じてScheduledExecutorService.scheduleWithFixedDelay(...)またはを使用します。ScheduledExecutorService.scheduleAtFixedRate(...)

  • scheduleWithFixedDelay(...)は、2 つの実行の間に指定された時間を維持するため、実行がどれだけ長く続くかに関係なく、指定された時間の後に次の実行が続きます。

    ...|XXXXXX|.............|XXXXX|.............|XXXXXXXXX|.............|X <--- fix ---> <--- fix ---> <--- fix --->

  • scheduleAtFixedRate(...)スケジューリング レートを維持しようとするため、プロデューサーがより長い時間を必要とする場合、2 つの実行間の時間は短縮されますが、2 つの実行が重複することはありません。

    ...|XXXXXXXX|...|XXXXX|......|XXXXXXXXXXXXXXX||XXX|....|XX <--- fix ---><--- fix ---><--- fix ---><--- fix ---><-

プロデューサーがどのように見えるかの簡単な例:

public class Producer implements Runnable {
    public void run() {
        ExecutorService executor = ... // create new executor

        // queue items
        for (Object item : itemSource) {
            executor.schedule(new Consumer(item));
        }

        // shutdown executor
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.HOURS);
    }
}

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleWithFixedDelay(new Producer(), 30, 30, TimeUnit.MINUTES);
// or
scheduler.scheduleAtFixedRate(new Producer(), 30, 30, TimeUnit.MINUTES);

もちろん、キャッチされないプロデューサ実行のすべての例外は、スケジューラによる再スケジュールを停止するため、適切な例外処理を行う必要があります。

エグゼキュータの作成にはコストがかかる可能性があるため、このアプローチは、アイテムの消費がエグゼキュータの作成よりもはるかにコストがかかる場合にのみ適切であることに注意してください (これは、あなたの状況ではそうです)。

于 2014-08-22T23:32:11.740 に答える