1

ですから、私はマルチスレッドにかなり慣れておらず、最近すべてのプログラムでこのアイデアを使用しています。さらに使用を開始する前に、Executor、CompletionService、BlockingQueue、およびObserverを使用してマルチスレッドを実装するための正しい効率的な方法であることを確認したいと思います。以下にサンプルコードを提供しますが、最初にそれがどのように機能すると思うかを簡単に説明します。おそらくそれが役立つでしょう。

私が最初に持っているのはBlockingQueueで、すべてのタスクがadd(Task task)メソッドを介してこのキューに追加されます。クラスの作成時に、runメソッドがwhile(true)呼び出しで呼び出され、タスクキューに何かが追加されるまで、キューのブロックが行われます。

run()内のキューに何かが追加されると、queue.take()はキューのアイテムを返します。次に、そのアイテムを取得して、それを処理するWorkerThreadクラスに渡します。そのworkerThreadは、スレッドの終了の待機を処理するCompletionServiceプールに追加されます。

さて、正しいかどうかわからない部分が来ました。runnableを実装し、クラスが初期化されたときに開始される内部クラスもあります。その仕事は、pool.take()を呼び出して永久ループすることです。したがって、これは基本的に、WorkerThreadsの1つが完了するのを待ちます。完了サービスにこれを処理させます。take()が値を取得すると、内部クラスはその値をnotifyobserverメソッドに渡します。

これは大丈夫ですか?タスクキューでwhile(true)ループで実行されるメインクラスと、WorkerThreadから結果を受信するためにプールで待機している内部クラスもループしているのは少し心配です。

これが実装例です。あなたが思うこと?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}
4

1 に答える 1

2

私の経験から、あなたの解決策は大丈夫のようです。私は3つのコメント/提案があります:

  1. 新しい実行スレッドとを作成するresponseWorkerThread = new Thread(schedulerWorker)responseWorkerThread.start()、基本的にこれら2つのループが分割されます。この部分は大丈夫に見えます。Executors APIを正しく使用しているように見えますが、スレッドを停止し、クラスの一部としてHttpScheduledWorkerをシャットダウンするために、さらにコードが必要になる可能性があります。ExecutionCompletionServiceHttpSchedulerThreaded
  2. あなたの使用queueが本当に必要かどうかはわかりません。ExecutionCompletionServiceすでにBlockingQueueに送信されたタスクを管理するためにを使用しています。
  3. あなたの「質問」は、ベータコードレビューサイトでよりよく当てはまるかもしれません。
于 2011-09-02T19:09:14.493 に答える