0

私はさまざまなバンドルを持つプロジェクトに取り組んでいます。例を見てみましょう。5 つのバンドルがあり、それらのバンドルのそれぞれにメソッド名があるとしprocessます。

現在、process methodこれら 5 つのバンドルすべてを 1 つずつ順番に呼び出してから、データベースに書き込んでいます。しかし、それは私が望んでいないことです。

  1. マルチスレッドを使用してこれら 5 つのバンドルprocess methodをすべて並列に呼び出してから、データベースに書き込む必要があります。
  2. また、これらのスレッドのタイムアウト機能も必要です。バンドルのすべてのスレッドに対してデフォルトのタイムアウト設定を行います。バンドルにタイムアウト設定よりも長い時間がかかっている場合は、それらのスレッドをタイムアウトにしてから、このバンドルがタイムアウトしたことをログに記録し、多くの時間がかかっていたことを示します。

質問が十分に明確であることを願っています...

以下は、これまでにプロセスメソッドを1つずつ順番に呼び出しているコードです。

public void processEvents(final Map<String, Object> eventData) {

    final Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {

        final Map<String, String> response = entry.getPlugin().process(outputs);

        // write to the database.
        System.out.println(response);
    }
}

これを行うための最良かつ効率的な方法が何であるかわかりませんか?将来的には、5 つ以上のバンドルを持つ可能性があるからです。

どうすればこれを達成できるかの例を誰か教えてもらえますか? これについての助けをいただければ幸いです。ありがとう。

4

2 に答える 2

1

目的を達成するのはそれほど難しくありませんが、同時実行性とタイムアウトの両方を使用すると、特にエラー処理に関して複雑さが増すことに注意する必要があります。

たとえば、タイムアウトが発生したときに実行されていたスレッドは、タイムアウト後も実行し続ける場合があります。割り込みシグナルを処理することによって協調する、正常に動作するスレッドのみが、処理の途中で正常に停止できます。

また、個々のバンドル エントリが並行して処理される可能性があること、つまりスレッド セーフであることも確認する必要があります。処理中に共有リソースを変更すると、結果として奇妙なエラーが発生する可能性があります。

また、これらの各スレッドへのデータベース書き込みを含めたいかどうかも疑問に思っていました. その場合、データベースへの書き込み中に中断を処理する必要があります。たとえば、トランザクションをロールバックします。

とにかく、すべてのスレッドのスレッド プーリングと合計タイムアウトを取得するにはExecutorService、(たとえば) 固定プール サイズを使用し、invokeAllメソッドを使用してすべてのスレッドを呼び出すことができます。

次の試行にはおそらく欠陥があり、エラー処理は決して完全ではありませんが、出発点となるはずです。

まず、スレッドの Callable の実装:

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

そして今、修正されたprocessEvents方法:

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}
于 2013-08-31T04:45:41.107 に答える
0

Steinar からの回答は正しいですが、「将来的には、5 つ以上のバンドルを持つ可能性があるかもしれない」と述べたように、このソリューションはスケーラブルではありません。そして、いくつかのタスクが完了している場合、実行時またはその後にバンドルを追加している可能性があり、最大 'n' 個のバンドルを並行して実行できるという制限もある可能性があると確信しています。その場合、executorService.InvokeAll は保留中を終了します指定されたタイマーに達した場合に開始されなかったタスク。
便利な簡単なサンプルを作成しました。この例では、並列に実行するスレッドの数に柔軟性があり、必要に応じてタスクやバンドルを追加できます。

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import testproject.Bundles;
import testproject.ExecuteTimedOperation;

public class ParallelExecutor
{
    public static int NUMBER_OF_PARALLEL_POLL = 4;

    public static void main(String[] args)
    {       
         ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARALLEL_POLL );      
         // Create bundle of objects you want
         List<Bundles> lstBun = new ArrayList<Bundles>();
         for (Bundles bundles : lstBun) 
         {
            final ExecuteTimedOperation ope =new ExecuteTimedOperation(bundles, new HashMap<String, Object>());
            executorService.submit(new Runnable() 
            {
                public void run() 
                {
                    ope.ExecuteTask();
                }
            });
        }   
    }   
}
package testproject;
import java.util.Map;
import java.util.Random;

public class ExecuteTimedOperation 
{
    Bundles _bun;
    Map<String, Object> _eventData;
    public static long TimeInMilleToWait = 60 * 1000; //Time which each thread should wait to complete task     

    public ExecuteTimedOperation(Bundles bun, Map<String, Object> eventData)
    {
        _bun = bun;
        _eventData = eventData;  
    }


    public void ExecuteTask()
    {
        try 
        {       
            Thread t = new Thread(new Runnable() 
            {
                public void run() 
                {   
                    _bun.processEvents(_eventData);                 
                }
            });

            t.start();
            t.join(TimeInMilleToWait);
        } 
        catch (InterruptedException e) {
                //log back saying this bundle got timeout bcoz it was taking lot of time.
        }
        catch (Exception e) {
            //All other type of exception will be handled here    
        }
    }
}

package testproject;

import java.util.Map;

public class Bundles 
{

    public void processEvents(final Map<String, Object> eventData) 
    {
        //THE code you want to execute

    }
}
于 2013-08-31T10:01:13.990 に答える