0

私は、さまざまなバンドルを持つプロジェクトに取り組んでいます。例を見てみましょう。5 つのバンドルがあり、それらのバンドルのそれぞれにメソッド名があるとしprocessます。

以下は、私がやるべきことです-

  1. processマルチスレッド コードを使用してこれら 5 つの Bundles メソッドをすべて並列に呼び出してから、データベースに書き込む必要があります。それを行う正しい方法は何ですか?スレッドを 5 つ持つ必要がありますか? バンドルごとに 1 つのスレッドですか? しかし、そのシナリオではどうなるでしょうか。バンドルが 50 ある場合、スレッドは 50 になるのでしょうか?
  2. また、タイムアウト機能も欲しいです。いずれかのバンドルが、弊社が設定したしきい値よりも多くの時間を費やしている場合、タイムアウトになり、このバンドルに多くの時間がかかったというエラーとしてログに記録されます。

私が行った次の試みにはおそらく欠陥があり、エラー処理は決して完全ではありません。しかし、どういうわけか、この行で常にエラーが発生します-

pool.invokeAll

そして、エラーは -

The method invokeAll(Collection<? extends Callable<T>>, long, TimeUnit) in the type ExecutorService is not applicable for the arguments (List<ModelFramework.ProcessBundleHolderEntry>, int, TimeUnit)

process method以下は、マルチスレッドの方法ですべてのバンドルを呼び出す私のメソッドです。

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {

        // somehow I always get an error at invokeAll method. Is there anything wrong?
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}

次に、ModelFramework クラスに追加したスレッド用の Callable の実装

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

発生しているエラーについて誰か助けてもらえますか? また、上記のアプローチに問題があるかどうか、または同じことを行うためのより良い効率的な方法があるかどうかを誰か教えてもらえますか?

これについての助けをいただければ幸いです。

4

2 に答える 2