私は、既存の Java アプリケーションの拡張に取り組んでいます。このアプリケーションは、毎日数百万のメッセージを処理するメッセージ プロセッサです。基本的に Core Java を使用して記述され、スレッドとキューは Collection クラスを使用して実装されます。
このアプリケーションでは、ある種のメッセージが単一のスレッドで実行されています。デュアル プロセッサを使用しているため、アプリケーションのこの特定の部分をマルチスレッド化して、メッセージをより高速に処理するというタスクが与えられました。
Java 5 を使用しているため、ThreadPoolExcecutor を使用するアプローチを取りました。特定のスレッドのメッセージを独自のスレッドで処理できるように、クライアントごとにプロセッサ スレッドを作成しました。プロセッサ スレッドは Callable インターフェイスを実装しています。これにより、前のタスクが終了したかどうかにかかわらず、将来のオブジェクトを確認できるようになります。
初期化プロセス中に、すべてのクライアントを調べて、それぞれのプロセッサ スレッドを作成し、ID を一意のキーとして使用してマップに格納します。以前に送信されたジョブを追跡するために、同じ ID を一意のキーとして使用して、将来のオブジェクトを別のマップに再度保持します。
以下は、私が使用したコードの一部です: メインクラスで -
ThreadPoolExecutor threadPool = null;
int poolSize = 20;
int maxPoolSize = 50;
long keepAliveTime = 10;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1000);
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,keepAliveTime, TimeUnit.SECONDS, queue);
....
....
for (each client...) {
id = getId()..
future = futuremap.get(id);
if(!future.isDone())
continue;
if(future == null || future.isDone()) {
processor = processormap.get(id);
if(processor == null) {
processor = new Processor(.....);
//add to the map
processormap.put(id,processor);
}
//submit the processor
future = threadPool.submit(processor );
futuremap.put(id,future);
}
}
プロセッサ スレッド
public class MyProcessor implements Callable<String> {
.....
.....
public String call() {
....
....
}
}
問題
上記の実装は、私のテスト環境でうまく機能しています。ただし、本番環境 ( Edit#1 - Ubuntu、Linux Slackware、Java - 1.6.0_18) では、この新しい ThreadpoolExecutor で管理されていないアプリケーションの他のスレッドが影響を受けていることがわかりました。つまり、彼らのタスクは何時間も遅れています。ThreadPoolExecutors によって作成されたスレッドがすべてのリソースを使用しており、他のスレッドにチャンスを与えていないためでしょうか。
ThreadPoolExceutor を使用して作成された新しいスレッドは、独立したタスクを実行しており、リソースを求めて他のスレッドと競合していません。つまり、競合状態のシナリオはありません。
ログを見ると、新しいスレッドの場合、最大 20 のスレッドが実行されており (corepoolsize)、拒否の例外がないことがわかります。つまり、送信数がキューの境界内にあるということです。
なぜこれが起こっているのですか?
前もって感謝します。