7

次のようなサービスがあります。

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

今はもっと速くしたいのですが、同時に開始できるフィルターもあれば、他のフィルターが終了するのを待たなければならないフィルターもあることがわかりました。例えば:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

つまり:

1.filter1 と filter2 は同時に開始でき、filter3 と filter4 も同時に開始できます。

2.filter3 と filter4 は、filter2 が終了するまで待機する必要があります

もう1つ

filter2 が true を返す場合、'process' メソッドはすぐに戻り、次のフィルターを無視します。

今私のソリューションは FutureTask を使用しています:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

私の CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

私は知りたいです:

1.この場合、FutureTask を使用することをお勧めしますか? より良い解決策はありますか?

2.スレッドコンテキストスイッチのオーバーヘッド。

ありがとう!

4

2 に答える 2

1

並列化に a を使用できますForkJoinPool。これは、その種の並列計算に対して明示的に考えられています。

(...) メソッド join() とそのバリアントは、完了依存関係が非循環の場合にのみ使用するのに適しています。つまり、並列計算は有向非巡回グラフ (DAG) として記述できます (...)

( を参照ForkJoinTask)

a の利点はForkJoinPool、すべてのタスクが新しいタスクを生成し、実行中のスレッドを実際にブロックすることなくそれらのタスクが完了するのを待機できることです (使用可能なスレッドよりも多くのタスクが他のタスクの完了を待機している場合、デッドロックが発生する可能性があります)。

これは、まだいくつかの制限がありますが、これまでのところ機能するはずの例です。

  1. フィルター結果を無視します
  2. フィルター 2 が返された場合、実行が途中で終了することはありません。true
  3. 例外処理は実装されていません

このコードの背後にある主なアイデア: すべてのフィルターはNode、他のノードに依存する可能性があるものとして表されます (= このフィルターを実行する前に完了しなければならないフィルター)。依存ノードは並列タスクとして生成されます。

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Node<V> extends RecursiveTask<V> {
    private static final short VISITED = 1;

    private final Callable<V> callable;
    private final Set<Node<V>> dependencies = new HashSet<>();

    @SafeVarargs
    public Node(Callable<V> callable, Node<V>... dependencies) {
        this.callable = callable;
        this.dependencies.addAll(Arrays.asList(dependencies));
    }

    public Set<Node<V>> getDependencies() {
        return this.dependencies;
    }

    @Override
    protected V compute() {
        try {
            // resolve dependencies first
            for (Node<V> node : dependencies) {
                if (node.tryMarkVisited()) {
                    node.fork(); // start node
                }
            }

            // wait for ALL nodes to complete
            for (Node<V> node : dependencies) {
                node.join();
            }

            return callable.call();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }

    public boolean tryMarkVisited() {
        return compareAndSetForkJoinTaskTag((short) 0, VISITED);
    }
}

使用例:

public static void main(String[] args) {
    Node<Void> filter1 = new Node<>(filter("filter1"));
    Node<Void> filter2 = new Node<>(filter("filter2"));
    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
    Node<Void> root = new Node<>(() -> null, filter5);

    ForkJoinPool.commonPool().invoke(root);
}

public static Callable<Void> filter(String name) {
    return () -> {
        System.out.println(Thread.currentThread().getName() + ": start " + name);
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": end   " + name);
        return null;
    };
}
于 2015-03-03T10:00:26.150 に答える