1

リモートの場所から項目をリストする Java イテレーターがあります。アイテムのリストは「ページ」で表示され、「次のページを取得する」操作はかなり遅くなります。(具体的には、イテレータが呼び出されS3Find、Amazon S3 からオブジェクトを一覧表示します)。

そこで、処理を高速化するために、1 つのリスト ページをプリフェッチしたいと考えました。これを行うために、ExecutorServiceおよびCallable/Futureパターンを使用してアイテムの「ページ」をプリフェッチしました。問題は、その反復子の呼び出し元が、クラスに通知せずにいつでも操作を放棄する可能性があることです。たとえば、次のループを考えてみましょう。

for (S3URL f : new S3Find(topdir).withRecurse(true)) {
    // do something with f
    if (some_condition) break;
}

その結果、リソース リークが発生しExecutorServiceます。サブミットに使用するCallableは、含まれている への参照がなくなってS3Findも (そして次のプリフェッチが完了しても) 実行されたままになるためです。

これを処理する適切な方法は何ですか?間違ったアプローチを使用していますか? プリフェッチごとに新しいベアスレッドを放棄ExecutorServiceして使用する必要がありますか (プリフェッチが完了したらスレッドを強制終了します)? ページの各フェッチには約 500 ミリ秒かかるため、毎回新しいスレッドを作成することはおそらく無視できることに注意してください。私が望んでいないことの 1 つは、反復処理が完了したことを呼び出し元に明示的に通知S3Findすることを要求することです (確実に忘れてしまう人もいるからです)。

現在のプリフェッチ コードは次のとおりです ( 内S3Find)。

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches
 * the next page using a {@link S3Find#NextPageGetter} Callable on a
 * separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private Future<ObjectListing> future;
    private final ExecutorService exec;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        exec = Executors.newSingleThreadExecutor();
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private Future<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            exec.shutdown();
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            return exec.submit(worker);
        }
    }
}

/**
 * This class retrieves the "next page" of a truncated ObjectListing.
 * It is meant to be called in a Callable/Future pattern.
 */
private static class NextPageGetter implements Callable<ObjectListing> {
    private final ObjectListing currentList;
    private final AmazonS3 s3;

    public NextPageGetter(AmazonS3 s3, ObjectListing currentList) {
        super();
        this.s3 = s3;
        this.currentList = currentList;
        if (currentList == null || !currentList.isTruncated()) {
            throw new IllegalArgumentException(currentList==null ?
                        "null List" : "List is not truncated");
        }
    }

    @Override
    public ObjectListing call() throws Exception {
        ObjectListing nextList = s3.listNextBatchOfObjects(currentList);
        return nextList;
    }
}
4

2 に答える 2

1

これは、私が数回遭遇した古典的な問題です。データベース接続で私に起こります。

ExecutorService を放棄して、すべてのプリフェッチに新しいベア スレッドを使用する (プリフェッチが完了したらスレッドを強制終了する) べきですか?

それがあなたの唯一の選択肢だと思います。私はわざわざスレッドを殺すつもりはありません。ジョブを終了させ、バックグラウンドで終了させるだけです。次のページの新しいスレッドをフォークします。スレッドに参加し、ある種の共通 (または何か) を使用して、呼び出し元とスレッドのAtomicReference間で結果リストを共有する必要があります。S3Find

私が望んでいないことの 1 つは、反復処理が完了したことを S3Find に明示的に通知することを呼び出し元に要求することです (確実に忘れてしまう人もいるからです)。

呼び出し元が try/finally で何らかのメソッドを呼び出さない限り、これを「正しく」実行する簡単な方法はありません。close()どういうわけかJavadocsでそれについて明示できませんか? それが、ORMLite データベース iteratorsで行ったことです。

S3Find s3Find = new S3Find(topdir).withRecurse(true);
try {
    for (S3URL f : s3Find) {
        ...
    }
} finally {
    s3Find.close();
}

次にS3Find.close()

public void close() {
    exec.shutdown();
}

Java 7 では、言語がリソースを自動的に閉じるtry with resources コンストラクトCloseableが追加されました。それは大きな勝利です。

于 2012-10-18T22:51:36.443 に答える
0

上記のように裸のスレッドを使用しながら、非常にシンプルで初期バージョンに非常に近いソリューションが得られたと思います。引き続きナイスCallableパターンを活用しますが、 a のFutureTask代わりに a を使用しFuture、まったく使用しませんExecutorService

私が以前に見逃していた重要なことは、それがFutureTask拡張Runnableされ、実際に 経由で起動できることnew Thread(task)です。言い換えると:

NextPageGetter worker = new NextPageGetter(s3, currentList);
FutureTask<ObjectListing> future = new FutureTask<>(worker);
new Thread(future).start();

そして後で:

currentList = future.get();

これで、イテレータが使い果たされたかどうかに関係なく、すべてのリソースが問題なく破棄されます。実際、スレッドはFutureTaskが完了するとすぐに消えます。

完全を期すために、変更されたコードを次に示します (class Pager変更のみ)。

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches the next page
 * using a {@link S3Find#NextPageGetter} Callable on a separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private FutureTask<ObjectListing> future;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private FutureTask<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            FutureTask<ObjectListing> f = new FutureTask<>(worker);
            new Thread(f).start();
            return f;
        }
    }
}
于 2012-10-19T01:06:50.200 に答える