スレッドの使い方が間違っていると思うので、これが良いデザインかどうか聞いてみました。基本的に、キューからデータをプルして処理するプログラムがあります(処理は純粋数学であるため、100%CPUを集中的に使用します)。データが適切な場合は、「適切な」キューに送信します。それ以外の場合は、完全に破棄されるか、その一部です。さらに処理するために、最初の「作業」キューに送り返されます。これが高レベルのロジックであり、キューがメモリ内にあるとき、プログラムはすべてのコアを使用し、非常に高速でした。データが大きくなるにつれて、キューサーバーを使用してキューを保存し、処理を複数のマシンに分散することにしました。現在は低速です(各コアの40%〜60%のみが使用されています)。
(yourkitと組み込みのnetbeansを使用して)コードのプロファイルを作成しようとしましたが、ほとんどの時間(80%)がキュープログラムに費やされていると表示されます。すべての外部プログラムを別のスレッドにプッシュすることで、プログラム内で常に数の計算を続けることができると思いましたが、パフォーマンスには役立たず、間違っているのではないかと思います。よくわかりませんが、既存のスレッド(親スレッド)からスレッド(子スレッド)を起動するかどうか疑問に思っています。親スレッドを終了する前に、子スレッドを完了する必要がありますか?
私のコードは非常に大きく、99%は必要ないので、高レベルのバージョンを作成します(コンパイルされない場合がありますが、私が何をしているのかがわかるはずです)。
public class worker {
private static ExecutorService executor;
static {
final int numberOfThreads = 4;
executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("starting worker..");
//Connection information goes here
channel.basicQos(50); //this is part of the connection, the queue server only gives 50 messages without acknowledgment
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //gets data from queue
String message = new String(delivery.getBody());
executor.submit(new DoWork(channel, message, delivery));
}
class DoWork implements Runnable{ //where the main work happens
//setup variables, basically passing queue connection information as well as data here, so I only need to rely on one connection
public void run() {
new Thread(new Awk_to_Queue(channel, delivery)).start(); //this sends an Awk_to_Queue to the queue, I launch a thread for this so my program can keep working.
if (data is good) {
new Thread(new Send_to_Queue("success_queue", message1, channel)).start();
continue;
} else if (Data is not good but not bad either ) {
new Thread(new Send_to_Queue("task_queue", message2, channel)).start();
}
class Send_to_Queue implements Runnable{
public void run() {
//takes data in and sends to queue in the way I used to previous do it, but just does it in a thread. queue connection is passed over so I only need to have one connection
}
}
class Awk_to_Queue implements Runnable{
public void run() {
//awk's queue so queue server can send one more piece of data to queue up
}
}
そこにそれがある。少し読みづらいのでごめんなさい(私がやっていることの構造を示すためだけにたくさんのものを削除しました)。スレッドをフォークしても速度に影響がなかったのは何が間違っているのでしょうか(速度が速くなったり、プロファイラーの結果が変更されたりすることはありません)。スレッドをフォークする方法に問題がありますか(new Thread(new Awk_to_Queue(channel, delivery)).start();
)、それとも私のスレッドのデザインのようなものですか?