9

私はC#Parallel.ForEachを使用して、1,000を超えるデータのサブセットを処理しています。セットのサイズにもよりますが、1セットの処理には5〜30分かかります。オプション付きの私のコンピューターで

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

8つの並列プロセスを取得します。私が理解したように、プロセスは並列タスク間で均等に分割されます(たとえば、最初のタスクはジョブ番号1,9,17などを取得し、2番目のタスクは2,10,18などを取得します)。したがって、1つのタスクが他のタスクよりも早く自分の仕事を終えることができます。これらのデータセットは他のデータセットよりも時間がかからなかったためです。

問題は、4つの並列タスクが24時間以内にジョブを終了するのに対し、最後の1つは48時間以内に終了することです。すべての並列タスクが等しく終了するように並列処理を整理する機会はありますか?これは、すべての並列タスクがすべてのジョブが完了するまで機能し続けることを意味しますか?

4

3 に答える 3

4

ジョブは等しくないため、プロセッサ間でジョブの数を分割して、ほぼ同時に終了させることはできません。ここで必要なのは、次のジョブを順番に取得する8つのワーカースレッドだと思います。次のジョブを取得するには、関数のロックを使用する必要があります。

私が間違っている場合、誰かが私を訂正しますが、頭のてっぺんから...ワーカースレッドに次のような関数を与えることができます:

public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

そして、次の仕事を取得するための関数は次のようになります。

private List<Job> jobs;
private int currentJob = 0;

private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}
于 2013-03-06T16:08:59.127 に答える
1

すぐに使用できるソリューションはないようで、作成する必要があります。

私の以前のコードは次のとおりです。

var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

すべてのCPU間で作業を平等に共有するために、私はまだParallel作業を行うために使用しますが、代わりにマットからのアイデアForEachを使用します。For新しいコードは次のとおりです。

Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

だから、あなたのアドバイスをありがとう。

于 2013-03-07T17:24:29.797 に答える
1

他の人が提案しているように、1つのオプションは、独自のプロデューサーコンシューマーキューを管理することです。を使用するBlockingCollectionと、これが非常に簡単になります。

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();

//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);

queue.CompleteAdding();

for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}
于 2013-03-07T17:34:08.137 に答える