8

TPLデータフローを使用して画像を処理しています。処理要求を受け取り、ストリームから画像を読み取り、いくつかの変換を適用してから、結果の画像を別のストリームに書き込みます。

Request -> Stream -> Image -> Image ... -> Stream

そのために私はブロックを使用します:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>

問題は、イニシャルRequestには、結果を作成するために必要なデータStreamと、その時点で必要な追加情報が含まれていることです。次のように、元のRequest(または他のコンテキストオブジェクト)をwriterBlock他のすべてのブロックに渡して渡す必要がありますか?

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...

(これは醜いです)、または最初のブロックを最後のブロックにリンクする(または、一般化して、追加のデータを必要とするブロックにリンクする)方法はありますか?

4

2 に答える 2

8

はい、ほとんどの場合、説明したことを実行して、すべてのブロックから次のブロックに追加のデータを渡す必要があります。

しかし、いくつかのヘルパーメソッドを使用すると、これをはるかに簡単にすることができます。

public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
    CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
        input => Tuple.Create(transform(input), input));
}

public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
    CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}

署名は気が遠くなるように見えますが、実際にはそれほど悪くはありません。

また、作成されたブロックにオプションを渡すオーバーロード、または非同期デリゲートを受け取るオーバーロードを追加することもできます。

たとえば、途中で元の番号を渡しているときに、別々のブロックを使用して番号に対していくつかの操作を実行したい場合は、次のようにすることができます。

var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));

source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);

for (int i = 0; i < 10; i++)
    source.Post(i);

ご覧のとおり、ラムダ(最後のラムダを除く)は「現在の」値(パイプラインのステージに応じて、、または)のみを処理し、「元の」値は(常にintdouble自動的に渡されます。いつでも、通常のコンストラクターを使用して作成されたブロックを使用して、両方の値にアクセスできます(例のfinalのように)。stringintActionBlock

BufferBlock実際には必要ありませんが、デザインに合わせて追加しました。)

于 2013-01-30T20:20:53.353 に答える
1

TPL Dataflowを使い始めたばかりなので、頭を悩ませているかもしれません。BroadcastBlockしかし、ソースと最初のターゲットの間の仲介としてを使用して、それを達成できると思います。

BroadcastBlockは多くのターゲットにメッセージを提供できるため、これを使用してターゲットに提供し最後JoinBlockに、結果を元のメッセージとマージします。

source -> Broadcast ->-----------------------------------------> JoinBlock <source, result>
                    -> Transformation1 -> Transformation 'n'  ->

例えば:

var source = new BufferBlock<int>();
var transformation =  new TransformBlock<int, int>(i => i * 100);

var broadCast = new BroadcastBlock<int>(null);
source.LinkTo(broadCast);
broadCast.LinkTo(transformation);

var jb = new JoinBlock<int, int>();
broadCast.LinkTo(jb.Target1);
transformation.LinkTo(jb.Target2);

jb.LinkTo(new ActionBlock<Tuple<int, int>>(
          c => Console.WriteLine("Source:{0}, Target Result: {1}", c.Item1, c.Item2)));

source.Post(1);
source.Post(2);

source.Complete();

収量...

ソース:1、ターゲット結果:100

ソース:2、ターゲット結果:200

非同期環境でどのように動作するかについてはよくわかりません。

于 2015-07-17T18:21:12.270 に答える