私は、TPLデータフローがこれに対する良い解決策のように聞こえることに同意します。
処理を制限するために、TransformBlock
実際にはデータを変換しないを作成できます。前のデータの直後に到着した場合は、データを遅延させるだけです。
static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
DateTime lastItem = DateTime.MinValue;
return new TransformBlock<T, T>(
async x =>
{
var waitTime = lastItem + delay - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
await Task.Delay(waitTime);
lastItem = DateTime.UtcNow;
return x;
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
次に、データ(たとえば、0から始まる整数)を生成するメソッドを作成します。
static async Task Producer(ITargetBlock<int> target)
{
int i = 0;
while (await target.SendAsync(i))
i++;
}
非同期で書き込まれるため、ターゲットブロックが現在アイテムを処理できない場合は、待機します。
次に、コンシューマーメソッドを記述します。
static void Consumer(int i)
{
Console.WriteLine(i);
}
そして最後に、それをすべてリンクして起動します。
var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));
var consumerBlock = new ActionBlock<int>(
(Action<int>)Consumer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
ここでdelayBlock
は、500ミリ秒ごとに最大1つのアイテムを受け入れ、Consumer()
メソッドは複数回並行して実行できます。処理を終了するには、を呼び出しますdelayBlock.Complete()
。
#2ごとにキャッシュを追加する場合は、別のキャッシュを作成TransformBlock
してそこで作業を行い、他のブロックにリンクすることができます。