TPL Dataflow をかなり使用していますが、解決できない問題についてつまずいています。
私は次のアーキテクチャを持っています:
BroadCastBlock<List<object1>>
-> 2 つの異なるTransformBlock<List<Object1>, Tuple<int, List<Object1>>>
-> 両方にリンクTransformManyBlock<Tuple<int, List<Object1>>, Object2>
チェーンの最後にある TransformManyBlock 内のラムダ式を変更します。(a) ストリーミングされたタプルに対して操作を実行するコード、(b) コードはまったくありません。
TransformBlocks 内で、最初のアイテムの到着から始まり、TransformBlock.Completion がブロックの完了を示すまでの時間を測定します (broadCastBlock は、propagCompletion が true に設定された transfrom ブロックにリンクします)。
私が調整できないのは、(b) の場合の transformBlocks が (a) の場合よりも約 5 ~ 6 倍速く完了する理由です。これは、TDF の設計意図全体の意図に完全に反します。変換ブロックからの項目は、transfromManyBlock に渡されたので、transformManyBlock が変換ブロックの完了時に影響を与える項目に対して何を行うかは、まったく問題ではありません。transfromManyBlock で行われていることが先行する TransformBlocks に関係している可能性がある理由は 1 つもありません。
この奇妙な観察を調整できる人はいますか?
違いを示すコードを次に示します。コードを実行するときは、次の 2 行を次のように変更してください。
tfb1.transformBlock.LinkTo(transformManyBlock);
tfb2.transformBlock.LinkTo(transformManyBlock);
に:
tfb1.transformBlock.LinkTo(transformManyBlockEmpty);
tfb2.transformBlock.LinkTo(transformManyBlockEmpty);
前の transformBlocks の実行時間の違いを観察するため。
class Program
{
static void Main(string[] args)
{
Test test = new Test();
test.Start();
}
}
class Test
{
private const int numberTransformBlocks = 2;
private int currentGridPointer;
private Dictionary<int, List<Tuple<int, List<Object1>>>> grid;
private BroadcastBlock<List<Object1>> broadCastBlock;
private TransformBlockClass tfb1;
private TransformBlockClass tfb2;
private TransformManyBlock<Tuple<int, List<Object1>>, Object2>
transformManyBlock;
private TransformManyBlock<Tuple<int, List<Object1>>, Object2>
transformManyBlockEmpty;
private ActionBlock<Object2> actionBlock;
public Test()
{
grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>();
broadCastBlock = new BroadcastBlock<List<Object1>>(list => list);
tfb1 = new TransformBlockClass();
tfb2 = new TransformBlockClass();
transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>
(newTuple =>
{
for (int counter = 1; counter <= 10000000; counter++)
{
double result = Math.Sqrt(counter + 1.0);
}
return new Object2[0];
});
transformManyBlockEmpty
= new TransformManyBlock<Tuple<int, List<Object1>>, Object2>(
tuple =>
{
return new Object2[0];
});
actionBlock = new ActionBlock<Object2>(list =>
{
int tester = 1;
//flush transformManyBlock
});
//linking
broadCastBlock.LinkTo(tfb1.transformBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
broadCastBlock.LinkTo(tfb2.transformBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//link either to ->transformManyBlock or -> transformManyBlockEmpty
tfb1.transformBlock.LinkTo(transformManyBlock);
tfb2.transformBlock.LinkTo(transformManyBlock);
transformManyBlock.LinkTo(actionBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
transformManyBlockEmpty.LinkTo(actionBlock
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//completion
Task.WhenAll(tfb1.transformBlock.Completion
, tfb2.transformBlock.Completion)
.ContinueWith(_ =>
{
transformManyBlockEmpty.Complete();
transformManyBlock.Complete();
});
transformManyBlock.Completion.ContinueWith(_ =>
{
Console.WriteLine("TransformManyBlock (with code) completed");
});
transformManyBlockEmpty.Completion.ContinueWith(_ =>
{
Console.WriteLine("TransformManyBlock (empty) completed");
});
}
public void Start()
{
const int numberBlocks = 100;
const int collectionSize = 300000;
//send collection numberBlock-times
for (int i = 0; i < numberBlocks; i++)
{
List<Object1> list = new List<Object1>();
for (int j = 0; j < collectionSize; j++)
{
list.Add(new Object1(j));
}
broadCastBlock.Post(list);
}
//mark broadCastBlock complete
broadCastBlock.Complete();
Console.WriteLine("Core routine finished");
Console.ReadLine();
}
}
class TransformBlockClass
{
private Stopwatch watch;
private bool isStarted;
private int currentIndex;
public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock;
public TransformBlockClass()
{
isStarted = false;
watch = new Stopwatch();
transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
(list =>
{
if (!isStarted)
{
StartUp();
isStarted = true;
}
return new Tuple<int, List<Object1>>(currentIndex++, list);
});
transformBlock.Completion.ContinueWith(_ =>
{
ShutDown();
});
}
private void StartUp()
{
watch.Start();
}
private void ShutDown()
{
watch.Stop();
Console.WriteLine("TransformBlock : Time elapsed in ms: "
+ watch.ElapsedMilliseconds);
}
}
class Object1
{
public int val { get; private set; }
public Object1(int val)
{
this.val = val;
}
}
class Object2
{
public int value { get; private set; }
public List<Object1> collection { get; private set; }
public Object2(int value, List<Object1> collection)
{
this.value = value;
this.collection = collection;
}
}
*編集: 別のコードを投稿しました。今回は値型のコレクションを使用していますが、上記のコードで観察している問題を再現できません。参照型を渡したり、それらを同時に (異なるデータ フロー ブロック内であっても) 処理したりすると、競合が発生してブロックされる可能性がありますか? *
class Program
{
static void Main(string[] args)
{
Test test = new Test();
test.Start();
}
}
class Test
{
private BroadcastBlock<List<int>> broadCastBlock;
private TransformBlock<List<int>, List<int>> tfb11;
private TransformBlock<List<int>, List<int>> tfb12;
private TransformBlock<List<int>, List<int>> tfb21;
private TransformBlock<List<int>, List<int>> tfb22;
private TransformManyBlock<List<int>, List<int>> transformManyBlock1;
private TransformManyBlock<List<int>, List<int>> transformManyBlock2;
private ActionBlock<List<int>> actionBlock1;
private ActionBlock<List<int>> actionBlock2;
public Test()
{
broadCastBlock = new BroadcastBlock<List<int>>(item => item);
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb12 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb21 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
tfb22 = new TransformBlock<List<int>, List<int>>(item =>
{
return item;
});
transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item =>
{
Thread.Sleep(100);
//or you can replace the Thread.Sleep(100) with actual work,
//no difference in results. This shows that the issue at hand is
//unrelated to starvation of threads.
return new List<int>[1] { item };
});
transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item =>
{
return new List<int>[1] { item };
});
actionBlock1 = new ActionBlock<List<int>>(item =>
{
//flush transformManyBlock
});
actionBlock2 = new ActionBlock<List<int>>(item =>
{
//flush transformManyBlock
});
//linking
broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions
{ PropagateCompletion = true });
broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions
{ PropagateCompletion = true });
tfb11.LinkTo(transformManyBlock1);
tfb12.LinkTo(transformManyBlock1);
tfb21.LinkTo(transformManyBlock2);
tfb22.LinkTo(transformManyBlock2);
transformManyBlock1.LinkTo(actionBlock1
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
transformManyBlock2.LinkTo(actionBlock2
, new DataflowLinkOptions
{ PropagateCompletion = true }
);
//completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{
Console.WriteLine("TransformBlocks 11 and 12 completed");
transformManyBlock1.Complete();
});
Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ =>
{
Console.WriteLine("TransformBlocks 21 and 22 completed");
transformManyBlock2.Complete();
});
transformManyBlock1.Completion.ContinueWith(_ =>
{
Console.WriteLine
("TransformManyBlock (from tfb11 and tfb12) finished");
});
transformManyBlock2.Completion.ContinueWith(_ =>
{
Console.WriteLine
("TransformManyBlock (from tfb21 and tfb22) finished");
});
}
public void Start()
{
const int numberBlocks = 100;
const int collectionSize = 300000;
//send collection numberBlock-times
for (int i = 0; i < numberBlocks; i++)
{
List<int> list = new List<int>();
for (int j = 0; j < collectionSize; j++)
{
list.Add(j);
}
broadCastBlock.Post(list);
}
//mark broadCastBlock complete
broadCastBlock.Complete();
Console.WriteLine("Core routine finished");
Console.ReadLine();
}
}