176

エグゼキュータサービスに送信する必要のあるタスクでいっぱいのキューがあるとします。一度に1つずつ処理してほしい。私が考えることができる最も簡単な方法は次のとおりです。

  1. キューからタスクを取得します
  2. エグゼキュータに送信します
  3. 返されたFutureで.getを呼び出し、結果が利用可能になるまでブロックします
  4. キューから別のタスクを取得します...

ただし、完全にブロックしないようにしています。タスクを一度に1つずつ処理する必要がある、そのようなキューが10,000ある場合、それらのほとんどがブロックされたスレッドを保持するため、スタックスペースが不足します。

私が欲しいのは、タスクを送信し、タスクが完了したときに呼び出されるコールバックを提供することです。そのコールバック通知をフラグとして使用して、次のタスクを送信します。(functionaljavaとjetlangは明らかにそのような非ブロッキングアルゴリズムを使用していますが、私はそれらのコードを理解できません)

独自のエグゼキュータサービスを作成する以外に、JDKのjava.util.concurrentを使用してこれを行うにはどうすればよいですか?

(これらのタスクを提供するキュー自体がブロックされる可能性がありますが、それは後で取り組むべき問題です)

4

11 に答える 11

163

完了通知で渡したいパラメータを受け取るためのコールバックインターフェイスを定義します。次に、タスクの最後にそれを呼び出します。

Runnableタスクの一般的なラッパーを作成し、これらをに送信することもできますExecutorService。または、Java 8に組み込まれているメカニズムについては、以下を参照してください。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

を使用CompletableFutureすると、Java 8には、プロセスを非同期かつ条件付きで完了できるパイプラインを構成するためのより複雑な手段が含まれていました。これは、不自然ですが完全な通知の例です。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
于 2009-05-05T18:28:33.783 に答える
59

Java 8では、CompletableFutureを使用できます。これは、ユーザーサービスからユーザーをフェッチし、それらをビューオブジェクトにマップしてから、ビューを更新するか、エラーダイアログを表示するために使用しているコードの例です(これはGUIアプリケーションです)。

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

非同期で実行されます。私は2つのプライベートメソッドを使用しています:mapUsersToUserViewsupdateView

于 2014-03-12T20:36:52.133 に答える
52

Guavaのリッスン可能なfutureAPIを使用して、コールバックを追加します。Cf. ウェブサイトから:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
于 2012-11-13T20:31:49.260 に答える
25

FutureTaskクラスを拡張し、メソッドをオーバーライドしてから、オブジェクトをにdone()追加すると、メソッドがすぐに完了すると呼び出されます。FutureTaskExecutorServicedone()FutureTask

于 2012-02-17T16:12:24.547 に答える
17

ThreadPoolExecutorまた、オーバーライドして利用できるフックメソッドもbeforeExecuteあります。これがのJavadocafterExecuteからの説明です。ThreadPoolExecutor

フックメソッド

このクラスは、各タスクの実行の前後に呼び出される保護されたオーバーライド可能メソッドbeforeExecute(java.lang.Thread, java.lang.Runnable)とメソッドを提供します。afterExecute(java.lang.Runnable, java.lang.Throwable)これらは、実行環境を操作するために使用できます。たとえば、再初期ThreadLocals化、統計の収集、ログエントリの追加などです。さらに、メソッドをオーバーライドして、が完全に終了terminated()した後に実行する必要のある特別な処理を実行できます。Executorフックメソッドまたはコールバックメソッドが例外をスローすると、内部ワーカースレッドが失敗し、突然終了する可能性があります。

于 2009-05-05T19:52:50.960 に答える
6

を使用しCountDownLatchます。

それはからjava.util.concurrentであり、続行する前にいくつかのスレッドが実行を完了するのを待つ方法です。

あなたが世話をしているコールバック効果を達成するために、それは少し追加の仕事を必要とします。つまり、を使用しCountDownLatchて待機する別のスレッドでこれを自分で処理してから、通知する必要があるものは何でも通知します。コールバック、またはその効果に類似したもののネイティブサポートはありません。


編集:私はあなたの質問をさらに理解したので、あなたは不必要に行き過ぎていると思います。定期的SingleThreadExecutorに行う場合は、すべてのタスクを実行すると、キューイングがネイティブに実行されます。

于 2009-05-05T18:21:15.313 に答える
5

同時に実行されるタスクがないことを確認したい場合は、SingleThreadedExecutorを使用してください。タスクは、送信された順序で処理されます。タスクを保持する必要はなく、幹部に提出するだけです。

于 2009-05-05T18:29:00.630 に答える
2

これは、Guavaを使用したPacheの回答の拡張ListenableFutureです。

特に、Futures.transform()returns ListenableFuturesoは、非同期呼び出しを連鎖させるために使用できます。Futures.addCallback()を返すvoidため、連鎖には使用できませんが、非同期完了での成功/失敗の処理には適しています。

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

注:非同期タスクを連鎖させることに加えて、Futures.transform()各タスクを個別のエグゼキューターでスケジュールすることもできます(この例には示されていません)。

于 2016-03-19T17:30:12.717 に答える
2

Callbackを使用してメカニズムを実装するための単純なコードExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

出力:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

キーノート:

  1. タスクをFIFO順に順番に処理する場合は、次のように置き換えますnewFixedThreadPool(5)newFixedThreadPool(1)
  2. callback前のタスクの結果を分析した後で次のタスクを処理する場合は、以下の行のコメントを外してください

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. newFixedThreadPool()次のいずれかに置き換えることができます

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    ユースケースによって異なります。

  4. コールバックメソッドを非同期で処理する場合

    a。共有ExecutorService or ThreadPoolExecutorタスクを呼び出し可能タスクに渡す

    b。CallableメソッドをCallable/Runnableタスクに変換する

    c。コールバックタスクを ExecutorService or ThreadPoolExecutor

于 2016-04-06T15:44:29.340 に答える
1

助けになったマットの答えに追加するために、コールバックの使用法を示すためのより具体的な例を次に示します。

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

出力は次のとおりです。

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
于 2015-08-29T12:39:58.513 に答える
1

次のようなCallableの実装を使用できます

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

ここで、CallbackInterfaceは次のような非常に基本的なものです。

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

これでメインクラスは次のようになります

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
于 2016-01-09T18:00:59.370 に答える