2

これは、趣味で書いたレーベンシュタイン距離の並列実装です。私は結果に失望しています。これをコア i7 プロセッサで実行しているので、利用可能なスレッドがたくさんあります。ただし、スレッド数を増やすと、パフォーマンスが大幅に低下します。つまり、同じサイズの入力に対してより多くのスレッドを使用すると、実際には遅くなります。

私がスレッドと java.util.concurrent パッケージを使用している方法を誰かが見て、私が何か間違ったことをしていないか教えてくれることを望んでいました。私が本当に興味があるのは、並列処理が期待どおりに機能しない理由だけです。読者が、ここで行われている複雑な索引付けに目を向けるとは思いません。私が行っている計算は正しいと信じています。しかし、そうでなくても、スレッドプール内のスレッドの数を増やすと、線形に近いスピードアップが見られるはずです。

私が使用したベンチマークコードを含めました。ここにあるライブラリをベンチマークに使用しています。2 番目のコード ブロックは、ベンチマークに使用したものです。

助けてくれてありがとう:)。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class EditDistance {
    private static final int MIN_CHUNK_SIZE = 5;
    private final ExecutorService threadPool;
    private final int threadCount;
    private final String maxStr;
    private final String minStr;
    private final int maxLen;
    private final int minLen;

    public EditDistance(String s1, String s2, ExecutorService threadPool,
            int threadCount) {
        this.threadCount = threadCount;
        this.threadPool = threadPool;
        if (s1.length() < s2.length()) {
            minStr = s1;
            maxStr = s2;
        } else {
            minStr = s2;
            maxStr = s1;
        }
        maxLen = maxStr.length();
        minLen = minStr.length();
    }

    public int editDist() {
        int iterations = maxLen + minLen - 1;
        int[] prev = new int[0];
        int[] current = null;

        for (int i = 0; i < iterations; i++) {
            int currentLen;
            if (i < minLen) {
                currentLen = i + 1;
            } else if (i < maxLen) {
                currentLen = minLen;
            } else {
                currentLen = iterations - i;
            }

            current = new int[currentLen * 2 - 1];
            parallelize(prev, current, currentLen, i);
            prev = current;
        }
        return current[0];
    }

    private void parallelize(int[] prev, int[] current, int currentLen,
            int iteration) {
        int chunkSize = Math.max(current.length / threadCount, MIN_CHUNK_SIZE);
        List<Future<?>> futures = new ArrayList<Future<?>>(currentLen);
        for (int i = 0; i < currentLen; i += chunkSize) {
            int stopIdx = Math.min(currentLen, i + chunkSize);
            Runnable worker = new Worker(prev, current, currentLen, iteration,
                    i, stopIdx);
            futures.add(threadPool.submit(worker));
        }
        for (Future<?> future : futures) {
            try {
                Object result = future.get();
                if (result != null) {
                    throw new RuntimeException(result.toString());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                // We can only finish the computation if we complete
                // all subproblems
                throw new RuntimeException(e);
            }
        }
    }

    private void doChunk(int[] prev, int[] current, int currentLen,
            int iteration, int startIdx, int stopIdx) {
        int mergeStartIdx = (iteration < minLen) ? 0 : 2;

        for (int i = startIdx; i < stopIdx; i++) {
            // Edit distance
            int x;
            int y;
            int leftIdx;
            int downIdx;
            int diagonalIdx;
            if (iteration < minLen) {
                x = i;
                y = currentLen - i - 1;
                leftIdx = i * 2 - 2;
                downIdx = i * 2;
                diagonalIdx = i * 2 - 1;
            } else {
                x = i + iteration - minLen + 1;
                y = minLen - i - 1;
                leftIdx = i * 2;
                downIdx = i * 2 + 2;
                diagonalIdx = i * 2 + 1;
            }
            int left = 1 + ((leftIdx < 0) ? iteration + 1 : prev[leftIdx]);
            int down = 1 + ((downIdx < prev.length) ? prev[downIdx]
                    : iteration + 1);
            int diagonal = penalty(x, y)
                    + ((diagonalIdx < 0 || diagonalIdx >= prev.length) ? iteration
                            : prev[diagonalIdx]);
            int dist = Math.min(left, Math.min(down, diagonal));
            current[i * 2] = dist;

            // Merge prev
            int mergeIdx = i * 2 + 1;
            if (mergeIdx < current.length) {
                current[mergeIdx] = prev[mergeStartIdx + i * 2];
            }
        }
    }

    private int penalty(int maxIdx, int minIdx) {
        return (maxStr.charAt(maxIdx) == minStr.charAt(minIdx)) ? 0 : 1;
    }

    private class Worker implements Runnable {
        private final int[] prev;
        private final int[] current;
        private final int currentLen;
        private final int iteration;
        private final int startIdx;
        private final int stopIdx;

        Worker(int[] prev, int[] current, int currentLen, int iteration,
                int startIdx, int stopIdx) {
            this.prev = prev;
            this.current = current;
            this.currentLen = currentLen;
            this.iteration = iteration;
            this.startIdx = startIdx;
            this.stopIdx = stopIdx;
        }

        @Override
        public void run() {
            doChunk(prev, current, currentLen, iteration, startIdx, stopIdx);
        }
    }

    public static void main(String args[]) {
        int threadCount = 4;
        ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
        EditDistance ed = new EditDistance("Saturday", "Sunday", threadPool,
                threadCount);
        System.out.println(ed.editDist());
        threadPool.shutdown();
    }
}

EditDistance 内にはプライベートな内部クラス Worker があります。各ワーカーは、EditDistance.doChunk を使用して現在の配列の範囲を埋める責任があります。EditDistance.parallelize は、これらのワーカーを作成し、それらがタスクを完了するのを待ちます。

そして、ベンチマークに使用しているコード:

import java.io.PrintStream;
import java.util.concurrent.*;
import org.apache.commons.lang3.RandomStringUtils;
import bb.util.Benchmark;

public class EditDistanceBenchmark {

    public static void main(String[] args) {
        if (args.length != 2) {
            System.out.println("Usage: <string length> <thread count>");
            System.exit(1);
        }
        PrintStream oldOut = System.out;
        System.setOut(System.err);

        int strLen = Integer.parseInt(args[0]);
        int threadCount = Integer.parseInt(args[1]);
        String s1 = RandomStringUtils.randomAlphabetic(strLen);
        String s2 = RandomStringUtils.randomAlphabetic(strLen);
        ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

        Benchmark b = new Benchmark(new Benchmarker(s1, s2, threadPool,threadCount));
        System.setOut(oldOut);

        System.out.println("threadCount: " + threadCount + 
                " string length: "+ strLen + "\n\n" + b);
        System.out.println("s1: " + s1 + "\ns2: " + s2);

        threadPool.shutdown();
    }

    private static class Benchmarker implements Runnable {
        private final String s1, s2;
        private final int threadCount;
        private final ExecutorService threadPool;

        private Benchmarker(String s1, String s2, ExecutorService threadPool, int threadCount) {
            this.s1 = s1;
            this.s2 = s2;
            this.threadPool = threadPool;
            this.threadCount = threadCount;
        }

        @Override
        public void run() {
            EditDistance d = new EditDistance(s1, s2, threadPool, threadCount);
            d.editDist();
        }

    }
}
4

1 に答える 1

2

並列化がうまくいかないコードを誤って記述してしまうのは非常に簡単です。主な原因は、スレッドが基盤となるシステム リソース (キャッシュ ラインなど) をめぐって競合する場合です。このアルゴリズムは本質的に、物理メモリ内で互いに近いものに作用するため、それが原因である可能性が非常に高いと思います。

偽の共有に関するこの優れた記事を確認することをお勧めします

http://www.drdobbs.com/go-parallel/article/217500206?pgno=3

次に、スレッドが相互にブロックするケースがないか、コードを慎重に確認してください。

さらに、スレッドが CPU バウンドの場合、CPU コアより多くのスレッドを実行すると、パフォーマンスが低下します (既にすべてのコアを 100% 近くまで使用している場合、スレッドを追加しても、コンテキスト スイッチのオーバーヘッドが追加されるだけです)。

于 2012-04-27T21:56:56.793 に答える