7

TPL Dataflow ライブラリを使用して、次のようなことをしたいと思います。

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

ActionBlock のキャンセル トークンがすべてをキャンセルしているようです。それを設定した場合、新しい ActionBlock を作成する必要があります。ActionBlock で部分的なキャンセルを行うことはできますか?

まだ処理されていない投稿は試行しないでください。現在実行中の投稿をチェックインできるキャンセル トークンがあればいいのですが。

4

3 に答える 3

4

BroadcastBlock<T>に投稿された最新のアイテムのみを保持する を見てください。の前にブロードキャスト ブロックを置くことができますActionBlock<T>

ブロードキャスト ブロックに新しいアイテムを投稿しても、アクション ブロックによって現在処理されているアイテムはキャンセルされませんが、ブロードキャスト ブロックによって既に保持されている既存のアイテムは上書きされます。つまり、アクション ブロックによってまだ処理されていない古いメッセージは破棄されます。アクション ブロックが現在のアイテムを完了すると、ブロードキャスト ブロックに投稿された最新のアイテムが取得されます。

于 2014-02-24T02:04:34.667 に答える
1

TPL Dataflow には直接このようなものはありませんが、自分で実装する方法はいくつかあります。

  1. 変更されたブロックを通常の Dataflow ブロックとして処理できるようにする必要がない場合 (たとえば、 のサポートがない場合LinkTo())、単純な方法は、 をラップする型を記述することですがActionBlock、そのアイテムには、ラップするかどうかを示すフラグも含まれています。処理されます。を指定するcancelAllPreviousPosts: trueと、これらのフラグはすべてリセットされるため、それらの項目はスキップされます。

    コードは次のようになります。

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. より構成可能で再利用可能なものを作成したい場合は、そのキャンセル専用の特別なブロックを作成できます。BufferBlock3 番目の容量は 1 で、2 番目の容量は無制限です。この方法では、キューに入れられたほとんどすべてのアイテムが 2 番目のブロックにあるため、そのブロックを新しいブロックと交換するだけでキャンセルを実行できます。Encapsulate()全体の構造は、1 番目と 3 番目のブロックを ing することで表されます。

    このアプローチの問題点は、キャンセルに 1 項目 (3 番目のブロックにある項目) の遅延があることです。また、これに適したインターフェイスがわかりませんでした。

于 2014-02-26T14:19:20.357 に答える