0

マルチスレッドのJavaプログラムを作成して、mongoデータを並行してフェッチして保存しようとしています。以下は CallBack のコードで、70 スレッドのスレッド プールを使用してワーカーを作成します。Callable を使用して CallBack をコールバックしています。

問題は、フェッチされたアイテムが CallBack リストに返される以上のものであるということです。何がうまくいかないのかわからない。誰でも助けることができますか?「FETCHED....」でさえ、「INDEXED ….」よりも大きな数を出力します。スレッドは互いにステップオーバーしていますか?

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallBack {
    List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
    final int NO_OF_THREAD = 70;

    public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        problemsToBeIndex.addAll(result);
        System.out.println(" Data Indexed "+problemsToBeIndex.size());
    }
    public  List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
            int ctr=0;
            while(ctr <= 100000) {
                CallingBackWorker worker = new CallingBackWorker();
                worker.setCallBack(this);
                final Future future = es.submit( worker);
                ctr +=100;
            }

            while(!es.isTerminated()) {}
            es.shutdown();
            System.out.println(" finished the retrival ");
        System.out.println("try to do something while the work is being done....");
        System.out.println("&quot;End work&quot; "+ new java.util.Date());
        return problemsToBeIndex;
    }

    public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        new CallBack().andAction();
    }
}

package com.chegg.migrator.question.parallel.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallingBackWorker implements Callable<Object>{
    CallBack callBack;

    static int calls = 0;
    static int fetched =0;
    static int indexed =0;
    List<TbsProblem> problems = new ArrayList<TbsProblem>();

    public CallingBackWorker() {
        super();
    }

    @Override
    public Object call() throws Exception {
        System.out.println("  fetching the data ....."+calls++);
        List<TbsProblem> problems = new ArrayList<TbsProblem>();
        for(int i=0;i<50;i++) {
            TbsProblem problem = new TbsProblem();
            problem.setId("fetched"+fetched);
            problems.add(problem);
        }
        Thread.sleep(500);
        fetched +=problems.size();
        System.out.println(" FETCHED ^^^^^^"+fetched);

        List<String> lists = new ArrayList<String>();
        for(TbsProblem tbs : problems) {
            lists.add(tbs.getId());
        }
        Thread.sleep(500);
        indexed += lists.size();
        System.out.println("   committed, exiting.");
        System.out.println(" INDEXED $$$$"+indexed);
        callBack.returnResult(problems);
        return null;
    }

      public CallBack getCallBack() {
        return callBack;
    }

    public void setCallBack(CallBack callBack) {
        this.callBack = callBack;
    }
}
4

1 に答える 1

1

fetchedは各 callable の外で宣言されていますか? そして、あなたはそれをいくつかのスレッドでインクリメントしていますか? もしそうなら、それは問題です。整数のインクリメントはスレッドセーフではありません。この場合は、fetchedを AtomicInteger に置き換えるか、同期ブロック内でインクリメントしてください。

複数のスレッドで整数をインクリメントすると問題になるのはなぜですか? 各スレッドはこれを行います:

STEP 1: read current value of fetched
STEP 2: calculate current value + problems.size()
STEP 3: assign new value to fetched

イメージ スレッド (1) はステップ 1 と 2 を完了し、fetchedの新しい値を10 として計算します。次に、スレッド (2) から (50) がステップ 1、2、および 3 を完了し ます。 ) は STEP 3 を完了し、fetchedに値 10 を再度割り当てます。

于 2013-07-04T18:22:12.700 に答える