5

との組み合わせScheduledThreadPoolExecutorを使用して、可変パラメーターを受け入れるコマンドを調整するにはどうすればよいですか?コマンドからの応答を受け取ったら、前述のコマンドの出力に基づいて新しいコマンドを作成する必要があります。また、1秒あたり100回の呼び出しのしきい値を順守する必要があります。ScheduledFutureExecutorCompletionServiceCallableCallableCallableCallable

4

4 に答える 4

5

リーキーバケットアルゴリズムを実装する必要があります。電話をかける前に、トークンが手に入るまでブロックします。このアルゴリズムは、数十行のJavaで実装できます。

于 2011-08-02T22:39:05.103 に答える
1

スロットリングにはセマフォを使用できます。A) 「瞬間ごとのスロットル」(同時にジョブの上限) または B) 「間隔ごとのスロットル」(1 つの間隔内のジョブの上限) を区別する必要があります。

A) 「瞬間ごとのスロットル」には、セマフォを上下にカウントするだけで十分です。例えば

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class ThrottlePerPerInstantSample {
    private static final int JOBS_COUNT = 100;
    private static final int JOBS_THROTTLE_PER_INSTANT = 10;
    private static final Semaphore THROTTLE_PER_INSTANT_SEMAPHORE = new Semaphore(
            JOBS_THROTTLE_PER_INSTANT);
    private static final ExecutorService executorService = Executors
            .newFixedThreadPool(JOBS_THROTTLE_PER_INSTANT);

    private final static AtomicInteger jobsAtTheSameTimeCounter = new AtomicInteger(
            0);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= JOBS_COUNT; i++) {
            THROTTLE_PER_INSTANT_SEMAPHORE.acquire();
            final PrintJob printJob = new PrintJob(i, jobsAtTheSameTimeCounter);
            final ThrottledJob throttledJob = new ThrottledJob(printJob,
                    THROTTLE_PER_INSTANT_SEMAPHORE);
            executorService.execute(throttledJob);
        }
        executorService.shutdown();
    }

    static class ThrottledJob implements Runnable {
        private final Runnable delegate;
        private final Semaphore throttlePerInstantSemaphore;

        public ThrottledJob(Runnable delegate,
                Semaphore throttlePerInstantSemaphore) {
            super();
            this.delegate = delegate;
            this.throttlePerInstantSemaphore = throttlePerInstantSemaphore;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } finally {
                throttlePerInstantSemaphore.release();
            }
        }

    }

    static class PrintJob implements Runnable {
        final int jobNumber;
        final AtomicInteger jobsAtTheSameTimeCounter;

        public PrintJob(int jobNumber, AtomicInteger jobsAtTheSameTimeCounter) {
            super();
            this.jobNumber = jobNumber;
            this.jobsAtTheSameTimeCounter = jobsAtTheSameTimeCounter;
        }

        public void run() {
            jobsAtTheSameTimeCounter.incrementAndGet();

            try {
                Thread.sleep(50); // wait some time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            synchronized (System.out) {
                System.out.println(jobsAtTheSameTimeCounter.getAndDecrement()
                        + " : Job " + jobNumber);
            }
        }
    }
}

出力は次のようになります。

10 : Job 1
9 : Job 2
8 : Job 3
7 : Job 5
7 : Job 4
...
9 : Job 87
10 : Job 90
9 : Job 89
10 : Job 91
9 : Job 92
8 : Job 93
7 : Job 94
6 : Job 95
5 : Job 98
4 : Job 97
3 : Job 96
2 : Job 100
1 : Job 99

B)セマフォをカウントダウンし、定期的にセマフォを初期値にリセットするだけで、「間隔ごとのスロットル」には十分です。

例えば

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ThrottlePerIntervallSample {
    private static final int JOBS_COUNT = 100;
    private static final int JOBS_THROTTLE_PER_INTERVALL = 10;
    private static final long INTERVALL_IN_UNITS = 1;
    private static final TimeUnit UNIT_OF_INTERVALL = TimeUnit.SECONDS;

    private static final Semaphore THROTTLE_PER_INTERVALL_SEMAPHORE = new Semaphore(
            JOBS_THROTTLE_PER_INTERVALL);

    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
            .newScheduledThreadPool(JOBS_THROTTLE_PER_INTERVALL + 1); 
    // plus one because the resetting of the semaphore must be possible! 

    public static void main(String[] args) throws InterruptedException {

        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(()-> {
            THROTTLE_PER_INTERVALL_SEMAPHORE.drainPermits(); // remove permits from previous intervall
        THROTTLE_PER_INTERVALL_SEMAPHORE.release(JOBS_THROTTLE_PER_INTERVALL); // set permits for the next intervall
        }, INTERVALL_IN_UNITS, INTERVALL_IN_UNITS, UNIT_OF_INTERVALL);

        for (int i = 1; i <= JOBS_COUNT; i++) {
            THROTTLE_PER_INTERVALL_SEMAPHORE.acquire();
            final PrintJob printJob = new PrintJob(i);
            SCHEDULED_EXECUTOR_SERVICE.execute(printJob);
        }

        SCHEDULED_EXECUTOR_SERVICE.shutdown();
    }

    static class PrintJob implements Runnable {
        final int jobNumber;

        public PrintJob(int jobNumber) {
            super();
            this.jobNumber = jobNumber;
        }

        public void run() {

            try {
                Thread.sleep(50); // wait some time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss:SSS");

            synchronized (System.out) {
                System.out.println(simpleDateFormat.format(new Date())
                        + " : Job " + jobNumber);
            }
        }
    }

}

出力は次のようになります。

00:42:29:253 : Job 9
00:42:29:255 : Job 2
00:42:29:255 : Job 6
00:42:29:255 : Job 5
00:42:29:255 : Job 10
00:42:29:256 : Job 7
00:42:29:256 : Job 3
00:42:29:256 : Job 1
00:42:29:256 : Job 8
00:42:29:257 : Job 4
00:42:30:140 : Job 11
...
00:42:37:142 : Job 90
00:42:38:140 : Job 91
00:42:38:140 : Job 92
00:42:38:141 : Job 99
00:42:38:141 : Job 93
00:42:38:141 : Job 94
00:42:38:142 : Job 98
00:42:38:142 : Job 96
00:42:38:142 : Job 95
00:42:38:143 : Job 100
00:42:38:143 : Job 97

いくつかのコメント:

1) 実際のデッドロックを回避するために、生産的なシステムで取得する代わりに、タイムアウトを指定して tryAcquire メソッドを使用することをお勧めします。

2) 多くのジョブを処理する場合は、(スケジュールされた) executor サービスにジョブを送信する前に aquire/tryAquire を呼び出します。そうしないと、その時点であまりにも多くのジョブがスレッド プールのキューを汚染する可能性があります。

于 2016-08-01T22:47:06.203 に答える