0

私は長年、最初のマルチスレッドアプリに取り組んでいます。私が抱えている問題は、2つのメソッドを同時に実行する必要があることです。これが私のエンジンクラスです:

public class ThreadPoolEngine {

    // create BlockingQueue to put fund transfer objects
    private BlockingQueue<GlobalSearchRequest> searchQueue;

    public ThreadPoolExecutor executor;

    private HashMap<String, GlobalSearchProcessorCallable> callableMap;

    private ArrayList<Future<Integer>> futurList;

    Logger logger = Logger.getLogger(ThreadPoolEngine.class);

    private Integer gthreadCount;
    private Integer gjobPerThread;

    public ThreadPoolEngine(Integer threadCount, Integer jobPerThread) {
        gthreadCount = threadCount;
        gjobPerThread = jobPerThread;
        // create a thread pool with the entered no of threads
        executor = new HammerThreadPoolExecutor(threadCount, threadCount, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        searchQueue = new ArrayBlockingQueue<GlobalSearchRequest>(jobPerThread);

        callableMap = new HashMap<String, GlobalSearchProcessorCallable>();

        // create list to store reference to Future objects
        futurList = new ArrayList<Future<Integer>>();
    }

    public void createAndSubmitTasks() {
        // create Callables
        for (int i = 0; i < gthreadCount; i++) {

            GlobalSearchProcessorCallable callable1 = new GlobalSearchProcessorCallable(
                    "SearchProcessor_" + i, searchQueue);
            callableMap.put(callable1.getThreadName(), callable1);

            // submit callable tasks
            Future<Integer> future;
            future = executor.submit(callable1);
            futurList.add(future);
        }
    }

    public void populateSearchQueue() throws InterruptedException {
        // put orderVO objects in BlockingQueue
        KeywordFactory key = KeywordFactory.getInstance();

        for (int i = 0; i < gjobPerThread*gthreadCount; i++) {
            // this method will put SearchRequest object in the order queue
            try {
                searchQueue.put(new GlobalSearchRequest(key.getRandomPhrase(3)));
            } catch (KeywordNoDataFileException e) {
                e.printStackTrace();
            }
        }
    }

    public void printProcessorStatus() throws InterruptedException {
        // print processor status until all orders are processed
        while (!searchQueue.isEmpty()) {
            for (Map.Entry<String, GlobalSearchProcessorCallable> e : callableMap
                    .entrySet()) {
                logger.debug(e.getKey() + " processed order count: "
                        + e.getValue().getProcessedCount());
            }
            Thread.sleep(1000);
        }
    }

    public void shutDown(boolean forceShutdown) {
        if (!forceShutdown) {
            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdown();
            logger.debug("Executor shutdown status " + executor.isShutdown());
            logger.debug("Executor terninated status "
                    + executor.isTerminated());

            // Mark threads to return threads gracefully.
            for (Map.Entry<String, GlobalSearchProcessorCallable> orderProcessor : callableMap
                    .entrySet()) {
                orderProcessor.getValue().setRunning(false);
            }
        } else {

            for (Future<Integer> f : futurList) {
                f.cancel(true);
            }

            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdownNow();
        }
    }

    public void printWorkersResult() {
        for (Future<Integer> f : futurList) {
            try {
                Integer result = f.get(1000, TimeUnit.MILLISECONDS);
                logger.debug(f + " result. Processed orders " + result);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            } catch (ExecutionException e) {
                logger.error(e.getCause().getMessage(), e);
            } catch (TimeoutException e) {
                logger.error(e.getMessage(), e);
            } catch (CancellationException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}

わかりました。これをインスタンス化するメインクラスがあり、このクラスで2つのメソッドpopulateSearchQueueとcreateAndSubmitTasksを呼び出して、ワーカークラスを実行し、検索キュー内のアイテムを処理します。

PopulateSearchQueueメソッドの構築には非常に長い時間がかかる可能性があり(一度に10億のクエリでシステムを攻撃することになります)、大量のメモリが必要になる可能性があります。私のメインクラスがpopulateSearchQueueとcreateAndSubmitTasksを同時に呼び出すことができるJavaの方法はありますか?それにより、ワーカースレッドはpopulateSearchQueueメソッドによって構築されている間にキューで作業を開始できますか?

4

1 に答える 1

1

私は実際にそれを解決しました。コードをもう一度読んで、スレッドプールの作成に少し時間がかかることに気づきました。したがってcreateAndSubmitTasks、スレッドプールを作成し、それぞれが何かをするのを待っているワーカークラスを割り当てた呼び出し。その方法が完了すると、1000スレッドのプールがそこに座って何もしなくなります。次に、呼び出した瞬間にpopulateSearchQueue、次のメソッドに移動するのに数ミリ秒かかったワーカースレッドがキューからジョブを取得し始め、目的の結果が得られます。キューにデータを入れているメソッドは、ワーカースレッドがそのキューからジョブを取得して実行しているのと同時に処理しています。

したがって、メソッドを呼び出す順序を逆にします。それは美しいものです。

于 2012-12-14T21:28:00.277 に答える