19

Dataflow CTPの使用(TPL内)

タイムアウト後に、現在キューに入れられている、または延期されているアイテムの数がBatchSize未満の場合、BatchBlock.TriggerBatchを自動的に呼び出す方法はありますか?

さらに良いことに、このタイムアウトは、ブロックが新しいアイテムを受信するたびに0にリセットする必要があります。

4

4 に答える 4

26

はい、ブロックをチェーンすることで、これをかなりエレガントに実現できます。この場合、BatchBlockの「前」にリンクするTransformBlockを設定します。これは次のようになります。

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
于 2012-02-24T00:52:49.250 に答える
6

これは、優れたDrewMarshのソリューションのポリシングバージョンです。これは、このDataflowBlock.Encapsulateメソッドを使用して、タイマー+バッチ機能をカプセル化するデータフローブロックを作成します。新しい引数timeoutに加えて、このメソッドは通常のコンストラクターCreateBatchBlockで使用可能なすべてのオプションもサポートします。BatchBlock

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}

代替案:以下は、すべての機能BatchUntilInactiveBlock<T>を提供するクラスです。この実装は、インスタンスBatchBlock<T>の薄いラッパーです。BatchBlock<T>以前のCreateBatchBlock実装よりもオーバーヘッドが少なく、同様の動作をします。

/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly Timer _timer;
    private readonly TimeSpan _timeout;

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timer = new Timer(_ => _source.TriggerBatch());
        _timeout = timeout;
    }

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
            => _source.LinkTo(target, linkOptions);

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            _timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
于 2019-12-04T09:02:56.797 に答える
4

最近の解決策で私を大いに助けてくれたTransformBlockを使用するというアイデアをくれたDrewMarshに感謝します。ただし、タイマーはバッチブロックの後でリセットする必要があると思います(つまり、バッチサイズに達したためにトリガーされた後、またはタイマーコールバック内でTriggerBatchメソッドが明示的に呼び出された後)。単一のアイテムを取得するたびにタイマーをリセットすると、実際にバッチをまったくトリガーせずに、タイマーが数回リセットされ続ける可能性があります(タイマーの「dueTime」を常に押してしまいます)。

これにより、コードスニペットは次のようになります。

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.
于 2019-12-06T16:47:35.740 に答える
-1

リンクオプションを使用できます

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});
于 2019-12-04T07:21:30.360 に答える