3

多くのプロジェクト/ファイルをコンパイルする必要があるアプリケーションをテストしています。

Parallel で処理する必要がある ConucrrentBag があります。

private readonly ConcurrentBag<string> m_files;

並列の私の呼び出しはこれです:

Parallel.ForEach(m_files, new ParallelOptions
            {
                MaxDegreeOfParallelism = MaxProcesses,

            }, currFile => ProcessSingle(currFile.ToString()));

MaxProcess の量は LogicalCpu*2 です。

140 個のプロジェクトをコンパイルしているとき、Parallel は最後まで線形の少ないスレッドを開始します。少なくとも、最後の 4 つのプロジェクトで実行されているスレッドは 1 つだけです。それは良くありませんが、大丈夫です。

今私の問題:

約 14000 以上のプロジェクト (COBOL-SOURCE です ;-) と非常に大きなシステムをコンパイルしているとき) Parallel.ForEach が新しいスレッドを開始していないため、最後のモジュールはコンパイルされません。この時点で有効なワーキングスレッドはありません。しかし、concurrentBag にはまだ 140 個の項目があります。

これを解決する方法を知っている人はいますか?

編集:その問題は、コンパイラを実行したときにのみ発生します。コンパイラを実行せずに(テストを高速化するため)正常に動作します...

編集:

Parallel.ForEach プロセスを開始すると、ConcurrentBag はすでに完全に満たされています。

詳細については、SingleProcess のコード:

private void ProcessSingle(string item)
        {
            Monitor.Enter(lockingObj);
            if (m_files.TryTake(out item))
            {
                if (CompilingModules <= 0)
                {
                    OnQueueStarted(new EventArgs());
                }
                CompilingModules++;
                Monitor.Exit(lockingObj);
                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String));

                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String));
                using (CobolCompiler compiler = new CobolCompiler())
                {
                    compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e);
                    compiler.Compile(item);
                    Thread.Sleep(2000);
                    if (compiler.LinkFailure)
                    {
                        if (ObjWithoutDll.ContainsKey(item))
                        {
                            if (ObjWithoutDll[item] <= 2)
                            {
                                m_files.Add(item);
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                                ObjWithoutDll[item]++;
                            }
                            else
                            {
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String));
                                ObjWithoutDll.Remove(item);
                            }
                        }
                        else
                        {
                            ObjWithoutDll.Add(item, 0);
                            m_files.Add(item);
                            OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                        }
                    }
                    else
                    {
                        if (compiler.DllExisting)
                        {
                            ObjWithoutDll.Remove(item);
                        }
                        OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String));
                    }

                }

                Monitor.Enter(lockingObj);
                CompiledModules++;
                if (CompiledModules % 300 == 0)
                {
                    Thread.Sleep(60000);
                }
                CompilingModules--;
                if (CompilingModules <= 0 && m_files.Count <= 0)
                {

                    try
                    {
                        Process prReschk = new Process();
                        FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd");
                        if (!batch.Exists)
                        {
                            Assembly _assembly = Assembly.GetExecutingAssembly();
                            StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd"));
                        }

                        if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe"))
                        {
                            File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe");
                        }

                        prReschk.StartInfo.FileName = @"cmd.exe";
                        prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir;
                        prReschk.StartInfo.CreateNoWindow = true;
                        prReschk.StartInfo.UseShellExecute = false;
                        prReschk.Start();
                        prReschk.Close();
                        prReschk.Dispose();
                    }
                    catch
                    {
                    }

                    OnQueueFinished(new EventArgs());
                }
            }
            Monitor.Exit(lockingObj);
        }

CobolCompiler クラスの Codesnippet は次のとおりです。

public void Compile(文字列ファイル) {

        file = file.ToLower();

        Process prCompile = new Process();
        Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\");

        try
        {
            // First clean up the folder
            CleanUpFolder(true, file);

            // First set lock and copy all sources
            Monitor.Enter(lockingObj);
            if (filesToCopy == null)
            {
                CopySource(Dir.FullName);
            }
            Monitor.Exit(lockingObj);

            FileInfo batch = new FileInfo(@"batches\compile.cmd");
            if (!batch.Exists)
            {
                Assembly _assembly = Assembly.GetExecutingAssembly();
                StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd"));
                _textStreamReader.Dispose();
            }

            prCompile.StartInfo.FileName = @"cmd.exe";
            prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\"));
            prCompile.StartInfo.CreateNoWindow = true;
            prCompile.StartInfo.UseShellExecute = false;
            prCompile.StartInfo.RedirectStandardOutput = true;
            prCompile.StartInfo.RedirectStandardError = true;
            prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1);
            prCompile.EnableRaisingEvents = true;
            prCompile.OutputDataReceived += prCompile_OutputDataReceived;
            prCompile.ErrorDataReceived += prCompile_OutputDataReceived;
            prCompile.Start();
            prCompile.BeginErrorReadLine();
            prCompile.BeginOutputReadLine();
            prCompile.WaitForExit();
            prCompile.Close();
            prCompile.Dispose();

            CleanUpFolder(false, file);

            if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe"))
            {
                dllExisting = true;
                linkFailure = false;
            }
            else
            {
                if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj"))
                {
                    linkFailure = true;
                }
                dllExisting = false;
            }



        }
        catch (ThreadAbortException)
        {
            if (prCompile != null)
            {
                // On Error kill process
                prCompile.Kill();
                prCompile.Dispose();
            }
        }
        catch (Win32Exception)
        {
        }
        catch (Exception)
        {
            dllExisting = false;
        }

        while (true)
        {
            try
            {
                if (Directory.Exists(Dir.FullName))
                {
                    Directory.Delete(Dir.FullName, true);
                    break;
                }
                else
                {
                    break;
                }
            }
            catch
            {
            }
        }


    }
private void CopySource(string Destination)
{
    filesToCopy = new StringCollection();
    foreach (string strFile in Directory.GetFiles(c.WorkingDir))
    {
        string tmpStrFile = strFile.ToLower();

        foreach (string Extension in c.Extensions)
        {
            if (tmpStrFile.Contains(Extension))
            {
                filesToCopy.Add(tmpStrFile);
            }
        }
    }

    if (filesToCopy.Count > 0)
    {
        foreach (string strFile in filesToCopy)
        {
            File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\")));
        }
    }
}

private void CleanUpFolder(bool PreCleanup, string Filename)
{
    //Copy all files from compilationfolder to working directory
    if (!PreCleanup)
    {
        foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*"))
        {
            FileInfo fileToMove = new FileInfo(strFile);

            if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf("."))))
            {
                File.Copy(strFile, c.WorkingDir + fileToMove.Name, true);
            }
        }
    }

    //Delete useless files
    foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*"))
    {
        bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1));
        if (PreCleanup)
        {
            // Only delete files, which are not won't be compiled
            if(!foundExt)
            {
                File.Delete(filename);
            }
        }
        else
        {
            if (!Config.Instance.SaveLspFile && filename.Contains(".lsp"))
            {
                File.Delete(filename);
            }

            if (!Config.Instance.SaveLstFile && filename.Contains(".lst"))
            {
                File.Delete(filename);
            }
        }
    }
}

public void Dispose()
{
    Dispose(true);
    GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
    if (!disposed)
    {
        if (disposing)
        {
            Dir = null;
        }
        disposed = true;
    }
}

~CobolCompiler()
{
    Dispose (false);
}

コンパイルプロセスごとに2秒間スリープしてみました。しかし、これは何も変わりません。

コンパイルの進行中、CPU は 100% です。アプリケーションは 270 MB の RAM を収集しています。開始時はわずか 35MB です。

コンパイラは同じ作業ディレクトリで同時に複数のファイルをコンパイルできないため、すべてのソースを一時フォルダーにコピーする必要があります。

編集: スレッドがなくてもアイテムがあるという問題は既に修正しました。

ProcessSingle では、dll にリンクされていないときに、もう一度コンパイルしようとしたアイテムを追加します。

そのため、Parallel.ForEach の処理中に、14000 個のアイテムから始めて、(リンクに失敗した場合は) アイテムをこの concurrentBag に再度追加しました。したがって、ForEach の 14000 回の実行で終了し、再度コンパイルする必要がある xxx モジュールがあります。:-(

私はそれを見ませんでした。WaitForExit なしで prReschk を実行することが意図されています。14000 を超えるアイテムのリソースをチェックするため、長い時間がかかりますが、新しいコンパイルを妨げるべきではありません。

しかし、ConcurrentBag の最後にスレッドが少ないという問題はまだ存在します :( しかし、それは大量のサイクルの場合にのみ通知されます。

4

2 に答える 2

2

Parallel.ForEach メソッドは、.Net ThreadPool を使用してスレッドを割り当てます。並行して実行される実際のスレッド数は、システム CPU の負荷に応じて ThreadPool によって管理されます。そのため、MaxDegreeOfParallelism を指定した可能性がありますが、これは最大値にすぎません。ThreadPool は、この最大値よりも少ないスレッドを割り当てることを決定する場合があります。

あなたの質問で与えられた証拠に基づいて、コンパイル プロセスがシステム リソースを使い果たし、後でクリーンアップしていないように思えます。これにより、140 回のコンパイルで、割り当てられるスレッド数が徐々に減少する理由が説明されます。ThreadPool は、CPU の負荷が高いと判断して、新しいスレッドを割り当てません。

コンパイルプロセスがどのように終了するかを詳しく見ていきます。ProcessSingle メソッドは、コンパイルが完全に完了する前に戻りますか? コンパイル プロセスでメモリ リークが発生していませんか?

実験として、ProcessSingle を呼び出した後に次の行を追加すると、動作が異なるかどうかを知りたいです。

 System.Threading.Thread.Sleep(2000);

これにより、次のタスクを割り当てるために制御を ThreadPool に戻す前に、スレッドが 2 秒間一時停止します。アプリケーションの動作が改善される場合、私の理論が正しいことを強く示唆しています。

于 2012-05-14T13:24:29.693 に答える