0

私は、さまざまなバンドルを持つプロジェクトに取り組んでいます。例を見てみましょう。5 つのバンドルがあり、それらのバンドルのそれぞれにメソッド名があるとしprocessます。

現在、processこれら 5 つのバンドルすべてのメソッドを 1 つずつ順番に呼び出してから、データベースに書き込んでいます。しかし、それは私が望んでいないことです。

以下は私が探しているものです -

  1. processマルチスレッド コードを使用してこれら 5 つの Bundles メソッドをすべて並列に呼び出してから、データベースに書き込む必要があります。それを行う正しい方法は何ですか?スレッドを 5 つ持つ必要がありますか? バンドルごとに 1 つのスレッドですか? しかし、そのシナリオではどうなるでしょうか。バンドルが 50 ある場合、スレッドは 50 になるのでしょうか?
  2. また、タイムアウト機能も欲しいです。いずれかのバンドルが、弊社が設定したしきい値よりも多くの時間を費やしている場合、タイムアウトになり、このバンドルに多くの時間がかかったというエラーとしてログに記録されます。

質問が十分に明確であることを願っています。

process以下は、これまでに各バンドルのメソッドを1つずつ順番に呼び出しているコードです。

public void callBundles(final Map<String, Object> eventData) {

    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {

        // calling the process method of a bundle
        final Map<String, String> response = entry.getPlugin().process(outputs);

        // then write to the database.
        System.out.println(response);
    }
}

これを行うための最良かつ効率的な方法が何であるかわかりませんか?そして、私は順番に書きたくありません。将来的には、5 つ以上のバンドルを持つ可能性があるからです。

誰かがこれを行う方法の例を教えてもらえますか? 私はそれをやろうとしましたが、どういうわけかそれは私が探している方法ではありません.

これについての助けをいただければ幸いです。ありがとう。

アップデート:-

これは私が思いついたものです -

public void callBundles(final Map<String, Object> eventData) {

    // Three threads: one thread for the database writer, five threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(5);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.EVENT_HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

しかし、これにはまだタイムアウト機能を追加できませんでした..また、上記のコードをそのまま実行すると、実行されません..何か不足していますか?

誰でもこれで私を助けることができますか?

4

1 に答える 1