5

List<String>呼ばれるlines巨大な(〜3G)Set<String>がありvocます。にあるすべての行を見つける必要がありlinesますvoc。このマルチスレッドの方法を実行できますか?

現在、私はこの簡単なコードを持っています:

for(String line: lines) {
  if (voc.contains(line)) {
    // Great!!
  }
}

同時に数行を検索する方法はありますか?既存のソリューションがあるかもしれませんか?

PS:私はを使用しjavolution.util.FastMapています。これは、充填中の動作が優れているためです。

4

4 に答える 4

2

これが可能な実装です。エラー/割り込み処理は省略されていますが、これが出発点になる可能性があることに注意してください。main メソッドを含めたので、これをコピーして IDE に貼り付けて簡単なデモを行うことができます。

編集:読みやすさとリストのパーティション分割を改善するために、少し整理しました

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelizeListSearch {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<String> searchList = new ArrayList<String>(7);
        searchList.add("hello");
        searchList.add("world");
        searchList.add("java");
        searchList.add("debian");
        searchList.add("linux");
        searchList.add("jsr-166");
        searchList.add("stack");

        Set<String> targetSet = new HashSet<String>(searchList);

        Set<String> matchSet = findMatches(searchList, targetSet);
        System.out.println("Found " + matchSet.size() + " matches");
        for(String match : matchSet){
            System.out.println("match:  " + match);
        }
    }

    public static Set<String> findMatches(List<String> searchList, Set<String> targetSet) throws InterruptedException, ExecutionException {
        Set<String> locatedMatchSet = new HashSet<String>();

        int threadCount = Runtime.getRuntime().availableProcessors();   

        List<List<String>> partitionList = getChunkList(searchList, threadCount);

        if(partitionList.size() == 1){
            //if we only have one "chunk" then don't bother with a thread-pool
            locatedMatchSet = new ListSearcher(searchList, targetSet).call();
        }else{  
            ExecutorService executor = Executors.newFixedThreadPool(threadCount);
            CompletionService<Set<String>> completionService = new ExecutorCompletionService<Set<String>>(executor);

            for(List<String> chunkList : partitionList)
                completionService.submit(new ListSearcher(chunkList, targetSet));

            for(int x = 0; x < partitionList.size(); x++){
                Set<String> threadMatchSet = completionService.take().get();
                locatedMatchSet.addAll(threadMatchSet);
            }

            executor.shutdown();
        }


        return locatedMatchSet;
    }

    private static class ListSearcher implements Callable<Set<String>> {

        private final List<String> searchList;
        private final Set<String> targetSet;
        private final Set<String> matchSet = new HashSet<String>();

        public ListSearcher(List<String> searchList, Set<String> targetSet) {
            this.searchList = searchList;
            this.targetSet = targetSet;
        }

        @Override
        public Set<String> call() {
            for(String searchValue : searchList){
                if(targetSet.contains(searchValue))
                    matchSet.add(searchValue);
            }

            return matchSet;
        }

    }

    private static <T> List<List<T>> getChunkList(List<T> unpartitionedList, int splitCount) {
        int totalProblemSize = unpartitionedList.size();
        int chunkSize = (int) Math.ceil((double) totalProblemSize / splitCount);

        List<List<T>> chunkList = new ArrayList<List<T>>(splitCount);

        int offset = 0;
        int limit = 0;
        for(int x = 0; x < splitCount; x++){
            limit = offset + chunkSize;
            if(limit > totalProblemSize)
                limit = totalProblemSize;
            List<T> subList = unpartitionedList.subList(offset, limit);
            chunkList.add(subList);
            offset = limit;
        }

        return chunkList;
    }

}
于 2013-01-27T23:20:07.443 に答える
1

複数のスレッドを使用してこれを並列化することは絶対に可能です。次のことができます。

  1. 検索を行うスレッドごとに、リストを異なる「ブロック」に分割します。
  2. 各スレッドにそのブロックを調べさせ、各文字列がセットに含まれているかどうかを確認し、含まれている場合はその文字列を結果のセットに追加します。

たとえば、次のスレッド ルーチンがあるとします。

public void scanAndAdd(List<String> allStrings, Set<String> toCheck,
                       ConcurrentSet<String> matches, int start, int end) {
    for (int i = start; i < end; i++) {
        if (toCheck.contains(allStrings.get(i))) {
            matches.add(allStrings.get(i));
        }
    }
}

次に、上記のメソッドを実行するために必要な数のスレッドを生成し、それらすべてが終了するのを待つことができます。結果の一致は に保存されmatchesます。

簡単にするために、出力を に設定しましたConcurrentSet。これにより、書き込みによる競合状態が自動的に解消されます。チェックする文字列のリストと文字列のセットに対してのみ読み取りを行っているため、 からの読み取りallStringsまたは検索の実行時に同期は必要ありませんtoCheck

お役に立てれば!

于 2013-01-27T21:10:52.137 に答える
1

これを探している場合、単純に異なるスレッド間でを分割すると、(少なくとも Oracle JVM では) 作業がすべての CPU に分散されます。私は CyclicBarrier を使用するのが好きで、これらのスレッドをより簡単な方法で制御できるようにします。

http://javarevisited.blogspot.cz/2012/07/cyclicbarrier-example-java-5-concurrency-tutorial.html

于 2013-01-27T21:11:10.807 に答える
0

もう1つのオプションは、 Akkaを使用することです。これは、これらの種類のことを非常に簡単に実行します。

実際、Akkaで検索作業を行った後、これについても言えることの1つは、ComposableFuturesまたはAgentsを介してそのようなものを並列化する2つの方法をサポートしていることです。あなたが望むもののために、構成可能な先物は完全に十分でしょう。次に、Akkaは実際にはそれほど多くを追加していません。Nettyは超並列ioインフラストラクチャを提供し、Futuresはjdkの一部ですが、Akkaを使用すると、これら2つを組み合わせて、必要に応じて拡張することが非常に簡単になります。

于 2013-01-27T23:24:23.227 に答える