以下に示すように、2つのプロセスがあります。私のプロセスにはそれぞれrun
とshutdown
方法があります
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
- ProcessA が独自のスレッド プールで実行され、ProcessB が互いに独立した独自のスレッド プールで実行されるように、Process ごとに異なるスレッド プール構成が必要です。
- また、独自のスレッド プールの各スレッド間で Process オブジェクトを共有することはできません。
以下は、私の Process クラスがどのように見えるかです。私のProcessA
,ProcessB
クラスは単に Process クラスを拡張します。そして、すべての重要なことを run メソッドで行います。
public abstract class Process implements Runnable {
private Properties props;
private String processName;
public Process(String processName, Properties props) {
this.processName = processName;
this.props = props;
}
protected abstract void shutdown();
protected abstract void run(String processName, Properties props);
@Override
public final void run() {
run(processName, props);
}
public Properties getProps() {
return props;
}
public void setProps(Properties props) {
this.props = props;
}
public String getProcessName() {
return processName;
}
public void setProcessName(String processName) {
this.processName = processName;
}
}
ProcessA
以下は、独自のスレッド プールを使用して実行する方法の簡単な例です。3 つのスレッドがあり、各スレッドは処理する独自の ProcessA オブジェクトを取得します。これをより一般的な方法で拡張して、私のプロセスProcessA
とProcessB
.
public static void main(String[] args) {
int numberOfThreads = 3;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<Process> processes = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
// each thread works on different Process object
Process processA = new ProcessA("processA", getProcessAProperties());
processes.add(processA);
executor.submit(processA);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Process process : processes) {
process.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
したがって、この問題をより一般的な方法で解決するために、以下に示すように Process ハンドラーを作成しました。
public final class ProcessHandler {
private final ExecutorService executorServiceProcess;
private final List<Process> processes = new ArrayList<>();
private final Thread shutdownHook = new Thread() {
@Override
public void run() {
for (Process process : processes)
process.shutdown();
executorServiceProcess.shutdown();
}
};
public ProcessHandler(Process process, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
Runtime.getRuntime().addShutdownHook(shutdownHook);
for (int i = 0; i < poolSize; i++) {
try {
// this line throws exception
Process p = process.getClass().newInstance();
p.setProcessName(process.getProcessName());
p.setProps(process.getProps());
processes.add(p);
executorServiceProcess.submit(p);
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
shutdownHook.start();
try {
shutdownHook.join();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
そして、これは私のメインメソッドが今見える方法です:
public static void main(String[] args) {
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
// processA will run with three threads in its own thread pool
ProcessHandler processHandlerA = new ProcessHandler (processA, 3);
// processB will run with two threads in its own thread pool
ProcessHandler processHandlerB = new ProcessHandler (processB, 2);
// now I can call shutdown on them
processHandlerA.shutdown();
processHandlerB.shutdown();
}
ProcessHandler
私のクラスのこの行は、次のようにProcess p = process.getClass().newInstance();
例外をスローします。
java.lang.InstantiationException: com.david.test.ProcessA
なぜInstantiationException
投げられるのかわかりませんか?
注意: これらの各プロセスはカフカ コンシューマーであり、一般にカフカ コンシューマーはスレッド セーフではないため、毎回新しいオブジェクトを作成してエグゼキューターに送信する必要があります。
アップデート:
これは私の ProcessA クラスは次のようになります。
public class ProcessA extends Process {
private KafkaConsumer<byte[], byte[]> consumer;
public ProcessA(String processName, Properties props) {
super(processName, props);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String processName, Properties props) {
consumer = new KafkaConsumer<>(props);
System.out.println("Hello World");
// do all kind of important stuff here
}
}