4

TPL Dataflow をかなり使用していますが、解決できない問題についてつまずいています。

私は次のアーキテクチャを持っています:

BroadCastBlock<List<object1>>-> 2 つの異なるTransformBlock<List<Object1>, Tuple<int, List<Object1>>>-> 両方にリンクTransformManyBlock<Tuple<int, List<Object1>>, Object2>

チェーンの最後にある TransformManyBlock 内のラムダ式を変更します。(a) ストリーミングされたタプルに対して操作を実行するコード、(b) コードはまったくありません。

TransformBlocks 内で、最初のアイテムの到着から始まり、TransformBlock.Completion がブロックの完了を示すまでの時間を測定します (broadCastBlock は、propagCompletion が true に設定された transfrom ブロックにリンクします)。

私が調整できないのは、(b) の場合の transformBlocks が (a) の場合よりも約 5 ~ 6 倍速く完了する理由です。これは、TDF の設計意図全体の意図に完全に反します。変換ブロックからの項目は、transfromManyBlock に渡されたので、transformManyBlock が変換ブロックの完了時に影響を与える項目に対して何を行うかは、まったく問題ではありません。transfromManyBlock で行われていることが先行する TransformBlocks に関係している可能性がある理由は 1 つもありません。

この奇妙な観察を調整できる人はいますか?

違いを示すコードを次に示します。コードを実行するときは、次の 2 行を次のように変更してください。

        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

に:

        tfb1.transformBlock.LinkTo(transformManyBlockEmpty);
        tfb2.transformBlock.LinkTo(transformManyBlockEmpty);

前の transformBlocks の実行時間の違いを観察するため。

class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private const int numberTransformBlocks = 2;
    private int currentGridPointer;
    private Dictionary<int, List<Tuple<int, List<Object1>>>> grid;

    private BroadcastBlock<List<Object1>> broadCastBlock;
    private TransformBlockClass tfb1;
    private TransformBlockClass tfb2;

    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlock;
    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlockEmpty;
    private ActionBlock<Object2> actionBlock;

    public Test()
    {
        grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>();

        broadCastBlock = new BroadcastBlock<List<Object1>>(list => list);

        tfb1 = new TransformBlockClass();
        tfb2 = new TransformBlockClass();

        transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>
                (newTuple =>
            {
                for (int counter = 1; counter <= 10000000;  counter++)
                {
                    double result = Math.Sqrt(counter + 1.0);
                }

                return new Object2[0];

            });

        transformManyBlockEmpty 
            = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>(
                  tuple =>
            {
                return new Object2[0];
            });

        actionBlock = new ActionBlock<Object2>(list =>
            {
                int tester = 1;
                //flush transformManyBlock
            });

        //linking
        broadCastBlock.LinkTo(tfb1.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );
        broadCastBlock.LinkTo(tfb2.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );

        //link either to ->transformManyBlock or -> transformManyBlockEmpty
        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

        transformManyBlock.LinkTo(actionBlock
                                  , new DataflowLinkOptions 
                                       { PropagateCompletion = true }
                                  );
        transformManyBlockEmpty.LinkTo(actionBlock
                                       , new DataflowLinkOptions 
                                            { PropagateCompletion = true }
                                       );

        //completion
        Task.WhenAll(tfb1.transformBlock.Completion
                     , tfb2.transformBlock.Completion)
                       .ContinueWith(_ =>
            {
                transformManyBlockEmpty.Complete();
                transformManyBlock.Complete();
            });

        transformManyBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine("TransformManyBlock (with code) completed");
            });

        transformManyBlockEmpty.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("TransformManyBlock (empty) completed");
        });

    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;


        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<Object1> list = new List<Object1>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(new Object1(j));
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}

class TransformBlockClass
{
    private Stopwatch watch;
    private bool isStarted;
    private int currentIndex;

    public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock;

    public TransformBlockClass()
    {
        isStarted = false;
        watch = new Stopwatch();

        transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
           (list =>
        {
            if (!isStarted)
            {
                StartUp();
                isStarted = true;
            }

            return new Tuple<int, List<Object1>>(currentIndex++, list);
        });

        transformBlock.Completion.ContinueWith(_ =>
            {
                ShutDown();
            });
    }

    private void StartUp()
    {
        watch.Start();
    }

    private void ShutDown()
    {
        watch.Stop();

        Console.WriteLine("TransformBlock : Time elapsed in ms: " 
                              + watch.ElapsedMilliseconds);
    }
}

class Object1
{
    public int val { get; private set; }

    public Object1(int val)
    {
        this.val = val;
    }
}

class Object2
{
    public int value { get; private set; }
    public List<Object1> collection { get; private set; }

    public Object2(int value, List<Object1> collection)
    {
        this.value = value;
        this.collection = collection;
    }    
}

*編集: 別のコードを投稿しました。今回は値型のコレクションを使用していますが、上記のコードで観察している問題を再現できません。参照型を渡したり、それらを同時に (異なるデータ フロー ブロック内であっても) 処理したりすると、競合が発生してブロックされる可能性がありますか? *

class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private BroadcastBlock<List<int>> broadCastBlock;
    private TransformBlock<List<int>, List<int>> tfb11;
    private TransformBlock<List<int>, List<int>> tfb12;
    private TransformBlock<List<int>, List<int>> tfb21;
    private TransformBlock<List<int>, List<int>> tfb22;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock1;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock2;
    private ActionBlock<List<int>> actionBlock1;
    private ActionBlock<List<int>> actionBlock2;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<List<int>>(item => item);

        tfb11 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb12 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb21 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb22 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                Thread.Sleep(100);
                //or you can replace the Thread.Sleep(100) with actual work, 
                //no difference in results. This shows that the issue at hand is 
                //unrelated to starvation of threads.

                return new List<int>[1] { item };
            });

        transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                return new List<int>[1] { item };
            });

        actionBlock1 = new ActionBlock<List<int>>(item =>
            {
                //flush transformManyBlock
            });

        actionBlock2 = new ActionBlock<List<int>>(item =>
        {
            //flush transformManyBlock
        });

        //linking
        broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions 
                                      { PropagateCompletion = true });

        tfb11.LinkTo(transformManyBlock1);
        tfb12.LinkTo(transformManyBlock1);
        tfb21.LinkTo(transformManyBlock2);
        tfb22.LinkTo(transformManyBlock2);

        transformManyBlock1.LinkTo(actionBlock1
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );
        transformManyBlock2.LinkTo(actionBlock2
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );

        //completion
        Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 11 and 12 completed");
                transformManyBlock1.Complete();
            });

        Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 21 and 22 completed");
                transformManyBlock2.Complete();
            });

        transformManyBlock1.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb11 and tfb12) finished");
            });

        transformManyBlock2.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb21 and tfb22) finished");
            });
    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<int> list = new List<int>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(j);
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}
4

1 に答える 1

3

Okay, final attempt ;-)

Synopsis:

The observed time delta in scenario 1 can be fully explained by differing behavior of the garbage collector.

When running scenario 1 linking the transformManyBlocks, the runtime behavior is such that garbage collections are triggered during the creation of new items (Lists) on the main thread, which is not the case when running scenario 1 with the transformManyBlockEmptys linked.

Note that creating a new reference type instance (Object1) results in a call to allocate memory in the GC heap which in turn may trigger a GC collection run. As quite a few Object1 instances (and lists) are created, the garbage collector has quite a bit more work to do scanning the heap for (potentially) unreachable objects.

Therefore the observed difference can be minimized by any of the following:

  • Turning Object1 from a class to a struct (thereby ensuring that memory for the instances is not allocated on the heap).
  • Keeping a reference to the generated lists (thereby reducing the time the garbage collector needs to identify unreachable objects).
  • Generating all the items before posting them to the network.

(Note: I cannot explain why the garbage collector behaves differently in scenario 1 "transformManyBlock" vs. scenario 1 "transformManyBlockEmpty", but data collected via the ConcurrencyVisualizer clearly shows the difference.)

Results:

(Tests were run on a Core i7 980X, 6 cores, HT enabled):

I modified scenario 2 as follows:

// Start a stopwatch per tfb
int tfb11Cnt = 0;
Stopwatch sw11 = new Stopwatch();
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
    if (Interlocked.CompareExchange(ref tfb11Cnt, 1, 0) == 0)
        sw11.Start();

    return item;
});

// [...]

// completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{

     Console.WriteLine("TransformBlocks 11 and 12 completed. SW11: {0}, SW12: {1}",
     sw11.ElapsedMilliseconds, sw12.ElapsedMilliseconds);
     transformManyBlock1.Complete();
});

Results:

  1. Scenario 1 (as posted, i.e. linked to transformManyBlock):
    TransformBlock : Time elapsed in ms: 6826
    TransformBlock : Time elapsed in ms: 6826
  2. Scenario 1 (linked to transformManyBlockEmpty):
    TransformBlock : Time elapsed in ms: 3140
    TransformBlock : Time elapsed in ms: 3140
  3. Scenario 1 (transformManyBlock, Thread.Sleep(200) in loop body):
    TransformBlock : Time elapsed in ms: 4949
    TransformBlock : Time elapsed in ms: 4950
  4. Scenario 2 (as posted but modified to report times):
    TransformBlocks 21 and 22 completed. SW21: 619 ms, SW22: 669 ms
    TransformBlocks 11 and 12 completed. SW11: 669 ms, SW12: 667 ms

Next, I changed scenario 1 and 2 to prepare the input data prior to posting it to the network:

// Scenario 1
//send collection numberBlock-times
var input = new List<List<Object1>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    var list = new List<Object1>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    input.Add(list);
}

foreach (var inp in input)
{
    broadCastBlock.Post(inp);
    Thread.Sleep(10);
}

// Scenario 2
//send collection numberBlock-times
var input = new List<List<int>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    List<int> list = new List<int>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(j);
    }

    //broadCastBlock.Post(list);
    input.Add(list);
 }

 foreach (var inp in input)
 {
     broadCastBlock.Post(inp);
     Thread.Sleep(10);
 }

Results:

  1. Scenario 1 (transformManyBlock):
    TransformBlock : Time elapsed in ms: 1029
    TransformBlock : Time elapsed in ms: 1029
  2. Scenario 1 (transformManyBlockEmpty):
    TransformBlock : Time elapsed in ms: 975
    TransformBlock : Time elapsed in ms: 975
  3. Scenario 1 (transformManyBlock, Thread.Sleep(200) in loop body):
    TransformBlock : Time elapsed in ms: 972
    TransformBlock : Time elapsed in ms: 972

Finally, I changed the code back to the original version, but keeping a reference to the created list around:

var lists = new List<List<Object1>>();
for (int i = 0; i < numberBlocks; i++)
{
    List<Object1> list = new List<Object1>();
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    lists.Add(list);                
    broadCastBlock.Post(list);
}

Results:

  1. Scenario 1 (transformManyBlock):
    TransformBlock : Time elapsed in ms: 6052
    TransformBlock : Time elapsed in ms: 6052
  2. Scenario 1 (transformManyBlockEmpty):
    TransformBlock : Time elapsed in ms: 5524
    TransformBlock : Time elapsed in ms: 5524
  3. Scenario 1 (transformManyBlock, Thread.Sleep(200) in loop body):
    TransformBlock : Time elapsed in ms: 5098
    TransformBlock : Time elapsed in ms: 5098

Likewise, changing Object1 from a class to a struct results in both blocks to complete at about the same time (and about 10x faster).


Update: Below answer does not suffice to explain the behavior observed.

In scenario one a tight loop is executed inside the TransformMany lambda, which will hog the CPU and will starve other threads for processor resources. That's the reason why a delay in the execution of the Completion continuation task can be observed. In scenario two a Thread.Sleep is executed inside the TransformMany lambda giving other threads the chance to execute the Completion continuation task. The observed difference in runtime behavior is not related to TPL Dataflow. To improve the observed deltas it should suffice to introduce a Thread.Sleep inside the loop's body in scenario 1:

for (int counter = 1; counter <= 10000000;  counter++)
{
   double result = Math.Sqrt(counter + 1.0);
   // Back off for a little while
   Thread.Sleep(200);
}

(Below is my original answer. I didn't read the OP's question careful enough, and only understood what he was asking about after having read his comments. I still leave it here as a reference.)

Are you sure that you are measuring the right thing? Note that when you do something like this: transformBlock.Completion.ContinueWith(_ => ShutDown()); then your time measurement will be influenced by the behavior of the TaskScheduler (e.g. how long it takes until the continuation task starts executing). Although I was not able to observe the difference you saw on my machine I got preciser results (in terms of the delta between tfb1 and tfb2 completion times) when using dedicated threads for measuring time:

       // Within your Test.Start() method...
       Thread timewatch = new Thread(() =>
       {
           var sw = Stopwatch.StartNew();
           tfb1.transformBlock.Completion.Wait();
           Console.WriteLine("tfb1.transformBlock completed within {0} ms",
                              sw.ElapsedMilliseconds);
        });

        Thread timewatchempty = new Thread(() =>
        {
            var sw = Stopwatch.StartNew();
            tfb2.transformBlock.Completion.Wait();
            Console.WriteLine("tfb2.transformBlock completed within {0} ms", 
                               sw.ElapsedMilliseconds);
        });

        timewatch.Start();
        timewatchempty.Start();

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
          // ... rest of the code
于 2012-12-20T09:27:36.240 に答える