6

私のプロジェクトでは、Java Futures を使用して並行タスクを頻繁に処理します。あるアプリケーションでは、各並行タスクが完了時にかなりの量のメモリを必要とします。他のいくつかの設計上の選択により、そのメモリはスレッドの外部で作成されたオブジェクトで作成および参照されます (以下の詳細な例を参照してください)。

驚いたことに、future タスク (つまり、その計算スレッド) が完了した後でも、future はこのオブジェクトへの参照を保持しています。つまり、このオブジェクトへの他の参照が他の場所に保持されていない場合、フューチャが解放されない限り、タスクが完了したとしても、オブジェクトは解放されません。

私の素朴な考えは、同時スレッドの数を制限すると、タスクが保持するリソース (メモリ) の数が自動的に制限されるというものでした。本当じゃない!

以下のコードを検討してください。この例では、いくつかのタスクを作成します。計算中に ArrayList (外部変数) のサイズが大きくなります。メソッドは を返しますVector<Future>。タスクが完了し、ArrayList のスコープが終了した場合でも、Future は引き続き ArrayList への参照を ( 経由でFutureTask.sync.callable) 保持します。

要約する:

  • Callable が完了した場合でも、FutureTask は Callable への参照を保持します。
  • Callable は、計算が完了した場合でも、計算中に使用される最終的な外部変数への参照を保持します。

質問: フューチャー経由で保持されているリソースを解放する最良の方法は何ですか? (もちろん、callable のローカル変数がスレッドの完了時に解放されることは知っています。これは私が求めているものではありません)。

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christianfries.com.
 *
 * Created on 17.08.2013
 */

package net.finmath.experiments.concurrency;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author Christian Fries
 *
 */
public class ConcurrencyTest {

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private int numberOfDoubles = 1024*1024/8;      // 1 MB
    private int numberOfModels  = 100;              // 100 * 1 MB

    /**
     * @param args
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ConcurrencyTest ct = new ConcurrencyTest();
        ct.concurrencyTest();
    }

    /**
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public void concurrencyTest() throws InterruptedException, ExecutionException {
        Vector<Double> results = getResults();

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
    }


    private Vector<Double> getResults() throws InterruptedException, ExecutionException {
        Vector<Future<Double>> resultsFutures = getResultsConcurrently();


        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);

        /*
         * At this point, we expect that no reference to the models is held
         * and the memory is freed.
         * However, the Future still reference each "model". 
         */

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

        Vector<Double> results = new Vector<Double>(resultsFutures.size());
        for(int i=0; i<resultsFutures.size(); i++) {
            results.add(i, resultsFutures.get(i).get());
        }       

        return results;
    }

    private Vector<Future<Double>> getResultsConcurrently() {

        /*
         * At this point we allocate some model, which represents
         * something our workers work on.
         */
        Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
        for(int i=0; i<numberOfModels; i++) {
            models.add(i, new ArrayList<Double>());
        }

        /*
         * Work on the models concurrently
         */
        Vector<Future<Double>> results = calculateResults(models);


        /*
         * Return the futures.
         * Note: We expect that no more reference is held to a model
         * once we are running out scope of this function AND the respective worker
         * has completed.
         */
        return results;
    }

    private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
        Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
        for(int i=0; i<models.size(); i++) {
            final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;

            Callable<Double> worker = new  Callable<Double>() {
                public Double call() throws InterruptedException {

                    /*
                     * The models will perform some thread safe lazy init,
                     * which we simulate here, via the following line
                     */
                    for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());

                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
                    for(Double value : model) sum += value.doubleValue();

                    Thread.sleep(1000);

                    Runtime.getRuntime().gc();
                    System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

                    return sum;
                }
            };

            // The following line will add the future result of the calculation to the vector results               
            results.add(i, executor.submit(worker));
        }

        return results;
    }
}

これはデバッガー/プロファイラーのスクリーンショットです (これは別の例で行われました)。FutureTask が完了しました (結果から明らかなように)。ただし、FutureTask は Callable への参照を保持します。この場合、 Callable は、いくつかの「大きな」オブジェクトを含む外側の最終変数引数への参照を保持します。

ここに画像の説明を入力

(この例はより現実的なものです。ここで、 Obba サーバーは、データの同時作成と処理を使用してスプレッドシートで動作します - 私のプロジェクトから取得しました)。

アップデート:

allprog と sbat の回答を踏まえて、いくつかコメントを追加したいと思います。

  • allprog の回答を受け入れました。これは、将来リソースを解放する方法についての元の質問に対する回答であるためです。私が気に入らないのは、このソリューションの外部ライブラリへの依存ですが、この場合は良いヒントです。

  • そうは言っても、私の好ましい解決策は sbat の解決策であり、以下の私自身の回答では、call() が完了した後に呼び出し可能オブジェクトで「より大きな」オブジェクトを参照しないようにすることです。実際、私の好みの解決策は、Callable を実装する匿名クラスを避けることです。代わりに、コンストラクターを持ち、コンストラクターを介して他のオブジェクトへのすべての参照を受け取り、call() 実装の最後にそれらを解放する、Callable を実装する内部クラスを定義します。

4

3 に答える 3

1

デフォルトの実装を使用する場合、Futureそれ自体はです。FutureTask は、送信した Runnable または Callable への参照を保持します。その結果、これらが割り当てるリソースは、Future が解放されなくなるまで保持されます。FutureTaskExecutionService

この種の問題への最善のアプローチはtransform、Future を別の Future に変換することです。これにより、結果のみが保持され、これが呼び出し元に返されます。この方法では、完了サービスや追加のハッキングは必要ありません。

// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        return new Integer(1234);
    }
});

// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the 
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() {
    public ListenableFuture<Integer> apply(Integer input) {
        return Futures.immediateFuture(input);
    }
}, service);

これにより、説明したスキームが効果的に実装されますが、ハッキングが肩から解放され、後ではるかに高い柔軟性が可能になります。これは Guava を依存関係としてもたらしますが、コストに見合うだけの価値があると思います。私はいつもこのスキームを使用しています。

于 2013-08-17T21:07:26.093 に答える
0

私の頭に浮かんだ解決策は

  • 完了サービスで先物を消費します ( を介してtake()、これらの先物を解放し、追加のスレッドでこれを実行して、新しい先物を返します。外側のコードは、完了サービスからの先物のみを参照します。

他の回答に加えて: 別の可能性は、sbat の回答のように、呼び出し可能なモデルへの参照を保持しないようにすることです。以下は、別の方法で同じことを行います ( Accessing constructor of an anonymous class への回答に触発されました):

  • セッターを介してオブジェクト「モデル」を匿名クラスのフィールドに渡し、完了時にそのフィールドをゼロに設定します。

        for(int i=0; i<models.size(); i++) {
    // REMOVED: final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;
    
            Callable<Double> worker = new  Callable<Double>() {
    /*ADDED*/   private ArrayList<Double> model;
    /*ADDED*/   public Callable<Double> setModel(ArrayList<Double> model) { this.model = model; return this; };
    
                public Double call() throws InterruptedException {
    
                    /* ... */
    
                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
    
                    /* ... */
    
    /*ADDED*/       model = null;
                    return sum;
                }
    /*ADDED*/   }.setModel(models.get(i));
    
    
            // The following line will add the future result of the calculation to the vector results
            results.add(i, executor.submit(worker));
        }
    
于 2013-08-17T20:31:45.147 に答える