22

Javaのエグゼキュータを正しく使用する方法を理解しようとしています。にタスクを送信することにExecutorServiceは、独自のオーバーヘッドがあることに気付きました。しかし、それがそれと同じくらい高いのを見て私は驚いています。

私のプログラムは、可能な限り低いレイテンシで大量のデータ(株式市場データ)を処理する必要があります。ほとんどの計算は、かなり単純な算術演算です。

私は非常に単純なものをテストしようとしました: " Math.random() * Math.random()"

最も単純なテストでは、この計算を単純なループで実行します。2番目のテストは、匿名のRunnable内で同じ計算を実行します(これは、新しいオブジェクトを作成するコストを測定することになっています)。3番目のテストはに合格RunnableしますExecutorService(これは、エグゼキュータを導入するコストを測定します)。

私は私のちっぽけなラップトップ(2 cpus、1.5ギグラムRAM)でテストを実行しました:

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(4回の実行のうち約1回、最初の2つの数値は等しくなります)

エグゼキュータは、単一のスレッドで実行するよりもはるかに長い時間がかかることに注意してください。スレッドプールのサイズが1〜8の場合、数値はほぼ同じでした。

質問:私は明らかな何かを見逃していますか、それともこれらの結果は期待されていますか?これらの結果は、エグゼキュータに渡すタスクはすべて、重要な計算を実行する必要があることを示しています。何百万ものメッセージを処理していて、各メッセージに対して非常に単純な(そして安価な)変換を実行する必要がある場合でも、エグゼキュータを使用できない可能性があります...複数のCPUに計算を分散しようとすると、単にコストがかかる可能性があります単一のスレッドでそれらを実行します。設計上の決定は、私が当初考えていたよりもはるかに複雑になります。何かご意見は?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}
4

10 に答える 10

20
  1. エグゼキュータの使用とは、CPUやCPUコアを利用することです。したがって、CPUの数を最大限に活用するスレッドプールを作成する場合は、CPU/コアと同じ数のスレッドが必要です。
  2. そうです、新しいオブジェクトの作成にはコストがかかりすぎます。したがって、費用を削減する1つの方法は、バッチを使用することです。実行する計算の種類と量がわかっている場合は、バッチを作成します。したがって、1つの実行されたタスクで実行される数千の計算について考えてみてください。スレッドごとにバッチを作成します。計算が完了するとすぐに(java.util.concurrent.Future)、次のバッチを作成します。新しいバッチの作成も並列で実行できます(4つのCPU->計算用に3つのスレッド、バッチプロビジョニング用に1つのスレッド)。最終的には、スループットは向上しますが、メモリ需要(バッチ、プロビジョニング)は高くなります。

編集:私はあなたの例を変更し、それを私の小さなデュアルコアx200ラップトップで実行させました。

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

ソースコードにあるように、バッチプロビジョニングとエグゼキュータのライフサイクルも測定から除外しました。これは、他の2つの方法と比較してより公平です。

自分で結果を見てください...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}
于 2009-10-30T10:43:57.207 に答える
8

これは、次の理由により、スレッドプールの公正なテストではありません。

  1. スレッドが1つしかないため、プーリングをまったく利用していません。
  2. 作業が単純すぎるため、プーリングのオーバーヘッドを正当化できません。FPPを使用したCPUでの乗算は、数サイクルしかかかりません。

オブジェクトの作成とジョブの実行に加えて、スレッドプールが実行する必要のある追加の手順に従うことを検討すると、

  1. ジョブをキューに入れます
  2. キューからジョブを削除します
  3. プールからスレッドを取得し、ジョブを実行します
  4. スレッドをプールに戻します

実際の仕事と複数のスレッドがある場合、スレッドプールの利点は明らかです。

于 2009-10-30T04:49:25.177 に答える
5

あなたが言及する「オーバーヘッド」はExecutorServiceとは何の関係もありません。これは、Math.randomで同期する複数のスレッドが原因で、ロックの競合が発生します。

そうです、あなたは何かが欠けています(そして以下の「正しい」答えは実際には正しくありません)。

これは、ロックの競合がない単純な関数を実行する8つのスレッドを示すJava8コードです。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

25k操作(ms)の120テストの結果のタイミング:

  • 同期実行時間:9956
  • 同期バッチ実行時間:9900
  • バッチ実行時間あたりのスレッド数:2176
  • エグゼキュータプールの実行時間:1922

勝者:エグゼキュータサービス。

于 2014-11-19T19:41:57.030 に答える
4

メソッドを呼び出すたびに新しいエグゼキュータサービスを作成しているので、これはまったく現実的ではないと思います。非現実的と思われる非常に奇妙な要件がない限り、通常はアプリの起動時にサービスを作成し、それにジョブを送信します。

ベンチマークを再試行したが、サービスをフィールドとして初期化した場合、一度、タイミングループの外側にあります。次に、Runnablesをサービスに送信する場合と、自分で実行する場合の実際のオーバーヘッドが表示されます。

しかし、あなたがその要点を完全に理解しているとは思いません-エグゼキュータは効率のためにそこにいることを意図していません。彼らは調整とスレッドプールへの作業の引き渡しをより簡単にするためにそこにいます。それらは常に自分自身を呼び出すよりも効率が悪くなりRunnable.run()ます(1日の終わりに、事前に追加のハウスキーピングを行った後、エグゼキュータサービスがこれを行う必要があるため)。非同期処理を必要とする複数のスレッドからそれらを使用しているとき、それらは本当に輝いています。

また、基本的に固定コスト(エグゼキュータのオーバーヘッドは、タスクの実行に1msまたは1hrかかる場合でも同じ)と非常に小さい可変量(些細な実行可能)の相対的な時間差を見ていることを考慮してください。エグゼキュータサービスが1msのタスクを実行するのに5ms余分にかかる場合、それはあまり好ましい数字ではありません。5秒のタスク(たとえば、重要なSQLクエリ)を実行するのに5ミリ秒余分にかかる場合、それは完全に無視でき、完全に価値があります。

したがって、ある程度状況によって異なります。非常にタイムクリティカルなセクションがあり、並列または非同期で実行する必要のない小さなタスクを多数実行している場合、エグゼキュータから何も取得されません。より重いタスクを並行して処理していて、非同期で応答したい場合(Webアプリケーションなど)、エグゼキューターは優れています。

それらがあなたにとって最良の選択であるかどうかはあなたの状況に依存しますが、実際には現実的な代表的なデータでテストを試す必要があります。タスクが本当に些細なものでない限り(そしてエグゼキュータインスタンスを再利用したくない場合を除いて)、実行したテストから結論を引き出すことは適切ではないと思います。

于 2009-10-30T12:04:28.403 に答える
4

Math.random()は、実際には単一の乱数ジェネレーターで同期します。Math.random()を呼び出すと、数値ジェネレーターで重大な競合が発生します。実際、スレッドが多ければ多いほど、スレッドは遅くなります。

Math.random()javadocから:

このメソッドは適切に同期されているため、複数のスレッドで正しく使用できます。ただし、多くのスレッドが高速で疑似乱数を生成する必要がある場合は、各スレッドが独自の疑似乱数ジェネレーターを持つようにするための競合を減らすことができます。

于 2012-07-25T00:11:13.253 に答える
1

まず、マイクロベンチマークにはいくつかの問題があります。あなたはウォームアップをします、それは良いことです。ただし、テストを複数回実行することをお勧めします。これにより、テストが実際にウォームアップしたかどうか、および結果の変動を感じることができます。また、各アルゴリズムのテストを別々に実行する方がよい傾向があります。そうしないと、アルゴリズムが変更されたときに最適化が解除される可能性があります。

タスクは非常に小さいですが、どれだけ小さいかは完全にはわかりません。したがって、何倍も速くすることはまったく意味がありません。マルチスレッドの状況では、同じ揮発性の場所にアクセスするため、スレッドによってパフォーマンスが大幅に低下する可能性があります(Randomスレッドごとにインスタンスを使用します)。また、47ミリ秒の実行は少し短いです。

確かに、小さな操作のために別のスレッドに移動するのは速くはありません。可能であれば、タスクをより大きなサイズに分割します。JDK7は、分割統治アルゴリズムからの細かいタスクをサポートしようとするフォークジョインフレームワークを備えているように見えます。このフレームワークは、アイドル状態のスレッドによって大きなタスクが引き出され、同じスレッドでタスクを順番に実行することを優先します。

于 2009-10-30T04:59:15.477 に答える
1

これが私のマシンでの結果です(64ビットUbuntu14.0上のOpenJDK8、Thinkpad W530)

simpleCompuation:6
computationWithObjCreation:5
computationWithObjCreationAndExecutors:33

確かにオーバーヘッドがあります。ただし、これらの数値が何であるかを覚えておいてください 。100k回の反復のミリ秒。あなたの場合、オーバーヘッドは反復ごとに約4マイクロ秒でした。私にとって、オーバーヘッドは約1/4マイクロ秒でした。

オーバーヘッドは、同期、内部データ構造、および複雑なコードパス(forループよりも確かに複雑)によるJIT最適化の欠如です。

1/4マイクロ秒のオーバーヘッドにもかかわらず、実際に並列化したいタスクはそれだけの価値があります。


参考までに、これは並列化するのに非常に悪い計算になります。スレッドを8(コアの数)に増やしました:

simpleCompuation:5
computationWithObjCreation:6
computationWithObjCreationAndExecutors:38

それはそれを速くしませんでした。これはMath.random()同期されているためです。

于 2016-08-21T20:45:19.613 に答える
0

Fixed ThreadPoolの最終的な目的は、作成済みのスレッドを再利用することです。したがって、タスクが送信されるたびに新しいスレッドを再作成する必要がないため、パフォーマンスが向上します。したがって、停止時間は、送信されたタスク内で取得する必要があります。runメソッドの最後のステートメントと同じです。

于 2011-02-11T15:47:57.757 に答える
0

計算の大部分を各スレッドに送信するには、何らかの方法で実行をグループ化する必要があります(たとえば、銘柄記号に基づいてグループを構築する)。Disruptorを使用すると、同様のシナリオで最良の結果が得られました。ジョブごとのオーバーヘッドは非常に低くなっています。それでもジョブをグループ化することは重要ですが、ナイーブなラウンドロビンは通常、多くのキャッシュミスを引き起こします。

http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.htmlを参照してください

于 2014-06-08T11:01:38.517 に答える
0

他の人に役立つ場合は、SamsungAndroidデバイスでの現実的なシナリオ(すべてのタスクが終了するまでExecutorServiceを繰り返し使用する)を使用したテスト結果を次に示します。

 Simple computation (MS): 102
 Use threads (MS): 31049
 Use ExecutorService (MS): 257

コード:

   ExecutorService executorService = Executors.newFixedThreadPool(1);
        int count = 100000;

        //Simple computation
        Instant instant = Instant.now();
        for (int i = 0; i < count; i++) {
            double x = Math.random() * Math.random();
        }
        Duration duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Simple computation (MS): " + duration.toMillis());


        //Use threads
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                double x = Math.random() * Math.random();
            }
            ).start();
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use threads (MS): " + duration.toMillis());


        //Use ExecutorService
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                        double x = Math.random() * Math.random();
                    }
            );
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use ExecutorService (MS): " + duration.toMillis());
于 2022-01-22T19:49:17.497 に答える