Dataflow CTPの使用(TPL内)
タイムアウト後に、現在キューに入れられている、または延期されているアイテムの数がBatchSize未満の場合、BatchBlock.TriggerBatchを自動的に呼び出す方法はありますか?
さらに良いことに、このタイムアウトは、ブロックが新しいアイテムを受信するたびに0にリセットする必要があります。
Dataflow CTPの使用(TPL内)
タイムアウト後に、現在キューに入れられている、または延期されているアイテムの数がBatchSize未満の場合、BatchBlock.TriggerBatchを自動的に呼び出す方法はありますか?
さらに良いことに、このタイムアウトは、ブロックが新しいアイテムを受信するたびに0にリセットする必要があります。
はい、ブロックをチェーンすることで、これをかなりエレガントに実現できます。この場合、BatchBlockの「前」にリンクするTransformBlockを設定します。これは次のようになります。
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());
TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
timeoutTransformBlock.LinkTo(yourBatchBlock);
yourBufferBlock.LinkTo(timeoutTransformBlock);
これは、優れたDrewMarshのソリューションのポリシングバージョンです。これは、このDataflowBlock.Encapsulate
メソッドを使用して、タイマー+バッチ機能をカプセル化するデータフローブロックを作成します。新しい引数timeout
に加えて、このメソッドは通常のコンストラクターCreateBatchBlock
で使用可能なすべてのオプションもサポートします。BatchBlock
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
代替案:以下は、すべての機能BatchUntilInactiveBlock<T>
を提供するクラスです。この実装は、インスタンスBatchBlock<T>
の薄いラッパーです。BatchBlock<T>
以前のCreateBatchBlock
実装よりもオーバーヘッドが少なく、同様の動作をします。
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly Timer _timer;
private readonly TimeSpan _timeout;
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timer = new Timer(_ => _source.TriggerBatch());
_timeout = timeout;
}
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
=> _source.LinkTo(target, linkOptions);
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
最近の解決策で私を大いに助けてくれたTransformBlockを使用するというアイデアをくれたDrewMarshに感謝します。ただし、タイマーはバッチブロックの後でリセットする必要があると思います(つまり、バッチサイズに達したためにトリガーされた後、またはタイマーコールバック内でTriggerBatchメソッドが明示的に呼び出された後)。単一のアイテムを取得するたびにタイマーをリセットすると、実際にバッチをまったくトリガーせずに、タイマーが数回リセットされ続ける可能性があります(タイマーの「dueTime」を常に押してしまいます)。
これにより、コードスニペットは次のようになります。
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);
TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);
// Start the producer which is populating the BufferBlock etc.
リンクオプションを使用できます
_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});