5

次の問題に直面しています。

オブジェクトのデータ ストリームがありFoo、それらのオブジェクトをいくつかの同時インプロセス タスク/スレッドにストリーミングします。これらのタスク/スレッドは、オブジェクトを処理してオブジェクトを出力しFooResultます。それぞれのメンバーには、 の作成に使用されたものFooResultと同じものが含まれています。ただし、必ずしもすべてが.FooFooResultFooFooResult

私の問題は、このプロセス全体から、並行タスク内から作成されたFoo可能性のある元のオブジェクトと、もしあればすべてのオブジェクトを含むラッピングオブジェクトを渡したいということです。FooResultFoo

注: 現在、TPL データフローを使用していますが、各同時プロセスActionBlock<Foo>は からリンクされている内で発生しますBroadCastBlock<Foo>SendAsync()ターゲット データフロー ブロックを使用して、潜在的に作成された を送信しますFooResult。明らかに、同時データ フロー ブロックFooResultは予測できない時間に生成されます。これは、私が現在苦労していることです。FooResultまとめて何個作成されたのか、ActionBlock<Foo>オリジナルと一緒にまとめてFooラッピングオブジェクトとして渡すことができるようには思えません。

擬似コードでは、現在次のようになっています。

BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2; 
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);

aBlock1 = new ActionBlock<Foo>(foo =>
{
    //do something here. Sometimes create a FooResult. If then
    targetBlock.SendAsync(fooResult);
});

//similar for aBlock2

ただし、現在のコードの問題は、アクション ブロックのいずれかでFooa がシングルを生成しなかった場合、targetBlock が何も受信しない可能性があることです。FooResultまた、FooResult各アクション ブロックがFooResult.

私が望むのは、targetBlock がそれぞれを含むラッピング オブジェクトを受け取り、オブジェクトが作成された場合は のコレクションもFoo受け取ることです。FooResultFooResult

説明されている方法でソリューションを機能させるために私ができることはありますか? TPL Dataflow を熟読する必要はありませんが、熟読できれば便利です。

更新: 以下は、svick によって提案された JoinBlock の実装を通じて取得したものです。私はそれを使用するつもりはありません (パフォーマンスの観点から微調整できない限り)。実行が非常に遅いため、1 秒あたり約 89000 個のアイテム (およびそれは int 値型のみ) に達します。

public class Test
{
    private BroadcastBlock<int> broadCastBlock;
    private TransformBlock<int, int> transformBlock1;
    private TransformBlock<int, int> transformBlock2;
    private JoinBlock<int, int, int> joinBlock;
    private ActionBlock<Tuple<int, int, int>> processorBlock;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        joinBlock = new JoinBlock<int, int, int>();

        processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
            {
                //Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });

        broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });

        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}

提案を反映するためのコードの更新:

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        joinBlock = new JoinBlock<int, int>();

        processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
            {
                //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target1);
        transformBlock2.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();
        Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}
4

2 に答える 2

2

私が問題を理解している限り:

lock foo
work on foo
if foo has not triggered sending a result
and fooResult exists
   send fooResult
   remember in foo that result has already been sent
unlock foo

OPのコメント後に更新

したがって、fooをBroadCastBlockにプッシュします

BroadCastBlock<Foo> bcb = new BroadCastBlock<Foo>(foo);
...

if ( aBlock1.HasResult ) 
{
    bcb.Add( aBlock1.Result );
}

if ( aBlock2.HasResult ) 
{
    bcb.Add( aBlock2.Result );
}

これで、bcbに何が存在するかを照会し、必要なものを送信できます(または、bcbを送信するだけです)。

更新(コメントでさらに議論した後)

class NotificationWrapper<TSource, TResult>
{
   private readonly TSource originalSource;

   private Queue<TResult> resultsGenerated = new Queue<TResult>()

   private int workerCount = 0;

   public NotificationWrapper<TSource, TResult>( TSource originalSource, int workerCount )
   {
       this.originalSource = originalSource;
       this.workerCount = workerCount;
   }

   public void NotifyActionDone()
   {
       lock( this )
       {
          --workerCount;
          if ( 0 == workerCount )
          {
             //do my sending
             send( originalSource, resultsGenerated );
          }
       }
   }

    public void NotifyActionDone( TResult result )
    {
        lock ( this )
        {
            resultsGenerated.push( result );
            NotifyActionDone();
        }
    }
}

そして、呼び出しコードで:

NotificationWrapper<Foo, Fooresult> notificationWrapper = new NotificationWrapper<Foo, Fooresult>( foo, 2 );
ActionBlock<Foo> ab1 = new ActionBlock<Foo>( foo, notificationWrapper );
ActionBlock<Foo> ab2 = new ActionBlock<Foo>( foo, notificationWrapper );

また、ActionBlockは、呼び出しNotifyActionDone()またはNotifyActoinDone( Fooresult )計算の完了後に変更する必要があります。

于 2012-11-21T16:15:24.753 に答える