目的を達成するのはそれほど難しくありませんが、同時実行性とタイムアウトの両方を使用すると、特にエラー処理に関して複雑さが増すことに注意する必要があります。
たとえば、タイムアウトが発生したときに実行されていたスレッドは、タイムアウト後も実行し続ける場合があります。割り込みシグナルを処理することによって協調する、正常に動作するスレッドのみが、処理の途中で正常に停止できます。
また、個々のバンドル エントリが並行して処理される可能性があること、つまりスレッド セーフであることも確認する必要があります。処理中に共有リソースを変更すると、結果として奇妙なエラーが発生する可能性があります。
また、これらの各スレッドへのデータベース書き込みを含めたいかどうかも疑問に思っていました. その場合、データベースへの書き込み中に中断を処理する必要があります。たとえば、トランザクションをロールバックします。
とにかく、すべてのスレッドのスレッド プーリングと合計タイムアウトを取得するにはExecutorService
、(たとえば) 固定プール サイズを使用し、invokeAll
メソッドを使用してすべてのスレッドを呼び出すことができます。
次の試行にはおそらく欠陥があり、エラー処理は決して完全ではありませんが、出発点となるはずです。
まず、スレッドの Callable の実装:
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
そして今、修正されたprocessEvents
方法:
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}