に割り込むのではBufferBlockなく、代わりにこれを行うチェーンに a を挿入しTransformManyBlockてみませんか? アイテムがまだ追加されていない場合にのみメソッドが返さHashSetれるを使用できます。最終的には非常に単純になりますが、ストレージ要件は明らかに時間とともに増加します...Addtrue
void Main()
{
var bb = new BufferBlock<string>();
var db = DataflowEx.CreateDistinctBlock<string>();
var ab = new ActionBlock<string>(x => Console.WriteLine(x));
bb.LinkTo(db);
db.LinkTo(ab);
bb.Post("this");
bb.Post("this");
bb.Post("this");
bb.Post("is");
bb.Post("is");
bb.Post("a");
bb.Post("test");
}
public class DataflowEx
{
public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
{
var hs = new HashSet<T>();
//hs will be captured in the closure of the delegate
//supplied to the TransformManyBlock below and therefore
//will have the same lifespan as the returned block.
//Look up the term "c# closure" for more info
return new TransformManyBlock<T, T>(
x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
}
}
これが機能する理由は、Linq の SelectMany と同様に、TransformManyBlock がリストのリストを効果的に平坦化するためです。そのため、TransformManyBlock は を返すデリゲートを受け取りますがIEnumerable<T>、返された項目を一度にIEnumerable<T>1 つずつ提供します。IEnumerable<T>0 または 1 個のアイテムを含む を返すことWhereで、述語が満たされるかどうかに応じて、アイテムの通過を許可するか、通過を阻止するかのような動作を効果的に作成できます。この場合、述語は、取得した HashSet にアイテムを追加できるかどうかです。