1

並行して再帰的なディレクトリトラバーサルおよびファイル処理プログラムを作成しました。これは、すべての並列計算が終了した後にハングすることがありますが、「プライマリ」スレッドは他のタスクを続行しません。

コードは基本的にフォークジョインスタイルのコンカレントアグリゲーターであり、並列アグリゲーションが完了すると、結果がSwingウィンドウに表示されます。集約の問題は、ツリーを生成し、階層の上位にあるリーフノードの統計を集約する必要があることです。

並行性の間違いを犯したと確信していますが、それを見つけることができません。投稿の最後にコードの関連部分を含めました(簡潔にするためにコードコメントを削除しました。150行で申し訳ありません。必要に応じて、外部の場所に移動できます)。

コンテキスト:Java 6u13、Windows XP SP3、Core2DuoCPU。

私の質問は次のとおりです。

このランダムなハングの原因は何でしょうか?

おそらく既存のライブラリの形で、同時ディレクトリトラバーサルを行うためのより良い方法はありますか?

Doug Lea(またはJava 7)のフォーク結合フレームワークは、集約/ディレクトリトラバーサルのためのより優れたフレームワークでしょうか?もしそうなら、どのように実装を再考する必要がありますか?概念レベルで?

お時間をいただきありがとうございます。

そしてコードの抜粋:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

編集JavaSourceEvaluatorにhasChildren()メソッドを追加しました。

4

1 に答える 1

1

JavaFileEvaluatorのaggregateメソッドでは、count.decrement()はfinallyブロックで呼び出されません。集計関数内でRuntimeExceptionsがスローされた場合(おそらくhasChildrenメソッドで、本体は表示されませんか?)、デクリメントの呼び出しは発生せず、CountUpDownは無期限に待機し続けます。これは、表示されているランダムなハングの原因である可能性があります。

2番目の質問については、これを行うためのJavaのライブラリを知りませんが、実際には調べていません。答えがないことをお詫びしますが、これは以前に使用する機会がなかったものです。

3番目の質問に関しては、他の誰かが提供するフォーク結合フレームワークを使用する場合でも、独自の同時実行フレームワークを提供し続ける場合でも、ディレクトリをトラバースするロジックを分離することで最大のメリットが得られると思います。並列処理の管理に関連するロジック。提供したコードは、CountUpDownクラスを使用してすべてのスレッドがいつ終了したかを追跡し、ディレクトリトラバースを処理するメソッド全体にインクリメント/デクリメントの呼び出しが散在することになり、バグを追跡する悪夢につながります。java7 fork-joinフレームワークに移行すると、実際のトラバーサルロジックのみを処理するクラスを作成し、並行性ロジックをフレームワークに任せる必要があります。これは、良い方法かもしれません。もう1つのオプションは、ここにあるものを使い続けることです。

于 2009-06-29T18:06:02.687 に答える