ですから、私はマルチスレッドにかなり慣れておらず、最近すべてのプログラムでこのアイデアを使用しています。さらに使用を開始する前に、Executor、CompletionService、BlockingQueue、およびObserverを使用してマルチスレッドを実装するための正しい効率的な方法であることを確認したいと思います。以下にサンプルコードを提供しますが、最初にそれがどのように機能すると思うかを簡単に説明します。おそらくそれが役立つでしょう。
私が最初に持っているのはBlockingQueueで、すべてのタスクがadd(Task task)メソッドを介してこのキューに追加されます。クラスの作成時に、runメソッドがwhile(true)呼び出しで呼び出され、タスクキューに何かが追加されるまで、キューのブロックが行われます。
run()内のキューに何かが追加されると、queue.take()はキューのアイテムを返します。次に、そのアイテムを取得して、それを処理するWorkerThreadクラスに渡します。そのworkerThreadは、スレッドの終了の待機を処理するCompletionServiceプールに追加されます。
さて、正しいかどうかわからない部分が来ました。runnableを実装し、クラスが初期化されたときに開始される内部クラスもあります。その仕事は、pool.take()を呼び出して永久ループすることです。したがって、これは基本的に、WorkerThreadsの1つが完了するのを待ちます。完了サービスにこれを処理させます。take()が値を取得すると、内部クラスはその値をnotifyobserverメソッドに渡します。
これは大丈夫ですか?タスクキューでwhile(true)ループで実行されるメインクラスと、WorkerThreadから結果を受信するためにプールで待機している内部クラスもループしているのは少し心配です。
これが実装例です。あなたが思うこと?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}