2

私は生産者と消費者のパターンを持っています。1 つのプロデューサー (何らかの作業を行う) と 7 つのコンシューマー (8 コア マシン上)。プロデューサーは、一度に 7 つのファイル (2000 のうち) をダウンロードし、7 つのスレッドがそれらを処理するのを待ってから、次の 7 つに進むことを意図しています。設定はおおよそ次のとおりです。

            ExecutorService threadPool = Executors.newFixedThreadPool(8);

        int pollWait = 4; 
        int queSize = 7;
        Broker broker = new Broker(queSize,pollWait);

Broker は、容量 7 の LinkedBlockingQueue を実装します。

        Producer producer = new Producer(); 
        producer.setBroker(broker);
        Future<Integer> producerStatus = threadPool.submit(producer);

        int numConsumerThreads = 7;

        ArrayList<Future<Integer>> consumers = new ArrayList<Future<Integer>>();

        for(int c=0; c < numConsumerThreads;c++)
        {
            String threadName = "consumer-"+(c+1);
            Consumer consumer = new Consumer(threadName);
            consumer.setBroker(broker);

            Future<Integer> consumerStatus = threadPool.submit(consumer);
            consumers.add(consumerStatus);                            
        }   

        if(producerStatus.isDone())
        {
            Integer numFilesRead = producerStatus.get();
            System.out.println("[INFO] Total number of files downloaded by producer: " + numFilesRead);
        }

        int k=0,numIndexedByThread=0;
        while(!consumers.isEmpty())
        {
            final Iterator<Future<Integer>> i = consumers.iterator();
            while (i.hasNext())
            {                   
               Future<Integer> f = i.next();
               if (f.isDone())
               {
                  i.remove();
                  numIndexedByThread = f.get();
                  k += numIndexedByThread;
               } 
             }
        }
        System.out.println("[INFO] Total number of files indexed: " + k);
        threadPool.shutdown();

私が見ている問題は、プログラムが実行されないことです。

4

1 に答える 1

1

結果の処理をシリアル化しました

次の行は、結果の処理全体をブロックしてシリアル化します。

Integer numConsumedByThread = consumerStatus.get();

正しいイディオムは、それぞれをチェックし、true を返す場合にのみ呼び出すことです。それ以外の場合はisDone()、結果が返されるまでループをブロックします。.get()isDone().get()

疑似コードっぽい

while(!listOfFutures.isEmpty())
{
    final Iterator<Future> i = listofFutures.iterator;
    while (i.hasNext())
    {
       Future f = i.next();
       if (f.isDone())
       {
          i.remove();
          // call .get() and process the completed result 
       } 
     }
}

JavaDocs を読むFuture.get()

于 2013-10-30T20:08:07.653 に答える