4

データベースを呼び出すワーカー メソッドを呼び出しています。このメソッドは反復処理を行い、並列処理のために値を返します。データベースに打撃を与えるのを防ぐために、DB への実行を一時停止する Thread.Sleep があります。ただし、これは Parallel.ForEach でまだ発生している実行をブロックしているようです。ブロックを防ぐためにこれを達成するための最良の方法は何ですか?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

編集:回答を含めるように変更しましたが、期待どおりに機能しません。.AsParallel().WithDegreeOfParallelism(10) を GetWorkItems() 呼び出しに追加しました。ベース スレッドがスリープ状態であっても、Parallel は実行し続ける必要があると考えているのですが、私の予想は間違っていますか?

例: 15 個のアイテムがあり、10 個のアイテムを繰り返し取得して開始します。それぞれが終了すると、16 番目の項目を要求するまで、GetWorkItems から別の項目を要求します。その時点で、それ以上のアイテムの取得を停止する必要がありますが、完了するまでアイテム 11 ~ 15 の処理を​​続行する必要があります。それはどのように並行して機能するべきですか?現在はそれを行っていないためです。現在実行しているのは、6 が完了すると、Parallel.ForEach でまだ実行されている後続の 10 をロックすることです。

4

4 に答える 4

8

作業項目のBlockingCollection(キュー)と、データベースにデータを入力するために30秒ごとにデータベースを呼び出すタイマーを作成することをお勧めします。何かのようなもの:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

そして初期化時に:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

これにより、30秒ごとにデータベースにアイテムが照会されます。

処理する作業項目をスケジュールするために、さまざまなソリューションがあります。あなたが持っているものに最も近いのはこれでしょう:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

これにより、スレッドのブロックがなくなり、TPLが並列タスクの最適な割り当て方法を決定できるようになります。

編集:

実際、あなたが持っているものに近いのは:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

次を使用できる場合があります。

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

それがうまくいくかどうか、どれだけうまくいくかはわかりません。試してみる価値があるかもしれません。。。

編集終了

一般に、私が提案しているのは、これをプロデューサー/コンシューマーアプリケーションとして扱い、プロデューサーはデータベースに新しいアイテムを定期的に照会するスレッドであるということです。私の例では、N(この場合は30)秒ごとに1回データベースにクエリを実行します。これは、平均して30秒ごとにワークキューを空にできる場合にうまく機能します。これにより、アイテムがデータベースに投稿されてから結果が得られるまでの平均待ち時間は1分未満になります。

ポーリングの頻度(したがって遅延)を減らすことができますが、それによってデータベーストラフィックが増加します。

あなたもそれでより空想を得ることができます。たとえば、30秒後にデータベースをポーリングし、大量のアイテムを取得した場合、より早く取得する可能性が高く、15秒(またはそれ以下)で再度ポーリングする必要があります。逆に、30秒後にデータベースをポーリングしても何も得られない場合は、再度ポーリングする前に、おそらくもっと長く待つことができます。

ワンショットタイマーを使用して、この種の適応ポーリングを設定できます。つまり、タイマーを作成するときに最後のパラメーターに-1を指定します。これにより、タイマーは1回だけ起動します。タイマーコールバックは、次のポーリングまでの待機時間をTimer.Change計算し、新しい値でタイマーを初期化するために呼び出します。

于 2011-09-26T23:22:27.153 に答える
3

.WithDegreeOfParallelism()拡張メソッドを使用して、 PLinq にタスクを同時に実行させることができます。C# Threading HandbookのCall Blocking or I/O Intensiveセクションに良い例があります。

于 2011-09-26T20:58:18.963 に答える
2

あなたはPartitionerのファウルに陥っている可能性があります。

IEnumerableを渡すため、Parallel.ForEachはチャンクパーティショナーを使用します。チャンクパーティショナーは、チャンク内の列挙から一度にいくつかの要素を取得しようとします。しかし、IEnumerable.MoveNextはスリープする可能性があり、これにより事態が混乱します。

一度に1つの要素を返す独自のパーティショナーを作成することもできますが、いずれの場合も、JimMischelの提案などのプロデューサー/コンシューマーアプローチの方がうまくいくと思います。

于 2011-09-27T13:04:14.263 に答える
0

睡眠で何を達成しようとしていますか?私が知る限り、あなたはデータベース コールのドキドキを避けようとしています。それを行うためのより良い方法はわかりませんが、理想的には、GetItemListデータが処理可能になるまで呼び出しがブロックされるようです。

于 2011-09-26T22:15:12.633 に答える