1

ブロードキャスト ブロックに渡されるリスト コレクションに問題があります。これが私がこれまでに持っているものです(完全なコードベースが長すぎるため、疑似コードです):

private BroadcastBlock<List<Quote>> tempBCB;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb1;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb2;
private BatchBlock<Dictionary<int, IParentOrder>> batchBlock;
private JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]> joinBlock;
private TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>,List<MySignal>> transformBlock;

tempBCB = new BroadcastBlock<List<Quote>>(quoteList => {
    return quoteList;
    //return Cloning.CloneListCloneValues<Quote>(quoteList);
});

tfb1 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

tfb2 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

batchBlock = new BatchBlock<Dictionary<int, IParentOrder>>(2);

joinBlock = new JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]>(
    new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>, List<MySignal>>(
    tuple => { //do something and return List<MySignal>;});

//Linking
tempBCB.LinkTo(tfb1);
tempBCB.LinkTo(tfb2);
tfb1.LinkTo(batchBlock);
tfb2.LinkTo(batchBlock);
tempBCB.LinkTo(joinBlock.Target1);
batchBlock.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(transformBlock);

私の問題は、現在の実装でtempBCBは、最終的に奇妙な結果が得られることTransformBlock<TInput, TOutput>です。

たとえば、タプルの一部としてのコレクションは、との実装が 100% 同一Dictionary<int, IParentrOrder>であっても、同じサイズではありません。tfb1tfb2

実装のコメントアウトされた行はtempBCB、ブロードキャストされたリストのディープ コピーを実行し、問題を解決しているように見えますが、問題は、このディープ コピーによってコードが約 10 倍遅くなることです。別の解決策。

まず第一に、これが問題なのか、バグがまだそこに隠れていても同時操作が期待どおりに実行される原因なのか、これだけの速度低下なのか、私には絶対にわかりません。

次に、ブロードキャスト ブロックにディープ コピーがないためにこれらの問題が発生する場合、どうすれば高速化できますか?

これが私のディープコピーコードです:

public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) 
    where TValue : ICloneable
{
    List<TValue> ret = new List<TValue>(original.Count);

    foreach (TValue entry in original)
    {
        ret.Add((TValue)entry.Clone());
    }

    return ret;
}

Quote[]ブロードキャスト ブロックの代わりにa をフィードできる可能性List<Quote>がありますが、ディープ コピーのパフォーマンスを高速化するのにどのように役立つかわかりません。

私の質問は次のとおりです。

  • ディープ コピーの問題は、ここでの本当の問題ですか (List<Quote>ブロードキャスト ブロックにストリーミングされた は、どの変換ブロックによっても変更されないため、疑いがあります)。
  • はいの場合、ディープコピーをより効率的にする理由と方法を教えてください。
4

1 に答える 1

1

最終的に問題を解決したので、私は自分の質問に答えます。svickが警告した問題は、broadcastBlockList<Quote>でディープコピーが必要かどうかとは関係ありませんでした(実際、ディープコピーは必要ありませんでした)。この問題は、joinBlockにもリンクしているbatchBlockがすべてのアイテムをjoinBlockにストリーミングする前に、完了するように要求されたbroadcastBlock(リンクされたデータフローブロックにtrueに設定された完全な伝播)に関連していました。変換ブロックを書き直したため、joinBlockを削除しました(元のアイテムだけでなく、独自の変換されたアイテムも返されるようになり、joinBlockは廃止されました。

メインのtransformBlockの同時実行性に関する注意:MaxDegreeOfParallelismを> 1に設定すると、この軽いワークロードでもパフォーマンス上の利点がすでに得られますが、重いワークロードをスローすると実際に機能します。

ここに、コンパイルして機能する完全なコードがあります(いくつかのクラスの名前を変更しましたが、構造は説明どおりのままです)。

public class Test
{
    private Stopwatch watch;

    private BroadcastBlock<List<InputObject>> tempBCB;
    private BatchBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> batchBlock;
    private TransformBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>[], List<FinalObject>> transformBlock;
    private ActionBlock<List<FinalObject>> justToFlushTransformBlock;

    private CoreLogic core1;
    private CoreLogic core2;

    public Test()
    {
        tempBCB = new BroadcastBlock<List<InputObject>>(input => input);

        //here batch size = 2
        batchBlock = new BatchBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>>(2, new GroupingDataflowBlockOptions { Greedy = false });

        transformBlock = new TransformBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>[],List<FinalObject>>(array =>
        {
            List<InputObject> inputObjects = array[0].Item1;
            List<FinalObject> ret = inputObjects.ConvertAll(x => new FinalObject(x));

            foreach (var tuple in array)
            {
                //iterate over each individual object
                foreach (var dictionary in tuple.Item2)
                {
                    ret[dictionary.Key].outputList.Add(dictionary.Value);
                }
            }

            return ret;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        justToFlushTransformBlock = new ActionBlock<List<FinalObject>>(list =>
            {
                //just in order to accept items from the transformBlock output queue
            });

        //Generate 2 CoreLogic objects
        core1 = new CoreLogic();
        core2 = new CoreLogic();

        //linking
        tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        core1.transformBlock.LinkTo(batchBlock);
        core2.transformBlock.LinkTo(batchBlock);

        batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numberChunks = 30;

        watch = new Stopwatch();
        watch.Start();

        for (int j = 1; j <= numberChunks; j++)
        {
            int collectionSize = 10000 * j;

            List<InputObject> collection = new List<InputObject>(collectionSize);
            for (int i = 0; i < collectionSize; i++)
            {
                collection.Add(new InputObject(i));
            }

            tempBCB.Post(collection);
        }

        tempBCB.Complete();

        Task.WhenAll(core1.transformBlock.Completion, core2.transformBlock.Completion).ContinueWith(_ =>
            {
                batchBlock.Complete();
            });

        transformBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
        Console.ReadLine();
    }
}

public class CoreLogic
{
    private Random rand;
    public TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> transformBlock;

    public CoreLogic()
    {
        const int numberIntermediateObjects = 10000;

        transformBlock = new TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>>(input =>
        {
            //please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

            Dictionary<int, IntermediateObject> ret = new Dictionary<int, IntermediateObject>();
            for (int i = 0; i < numberIntermediateObjects; i++)
            {
                IntermediateObject value = new IntermediateObject(i);

                ret.Add(i, value);
            }

            var tuple = new Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>(input, ret);

            return tuple;
        });
    }
}

public class InputObject : ICloneable
{
    public int value1 { get; private set; }

    public InputObject(int value)
    {
        this.value1 = value;
    }

    object ICloneable.Clone()
    {
        return Clone();
    }

    public InputObject Clone()
    {
        return (InputObject)this.MemberwiseClone();
    }
}

public class IntermediateObject
{
    public int value1 { get; private set; }

    public IntermediateObject(int value)
    {
        this.value1 = value;
    }
}

public class FinalObject
{
    public InputObject input { get; private set; }
    public List<IntermediateObject> outputList;

    public FinalObject(InputObject input)
    {
        this.input = input;

        this.outputList = new List<IntermediateObject>();
    }
}

public static class Cloning
{
    public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) where TValue : ICloneable
    {
        List<TValue> ret = new List<TValue>(original.Count);

        foreach (TValue entry in original)
        {
            ret.Add((TValue)entry.Clone());
        }

        return ret;
    }
}

これが同様の問題に苦しんでいる可能性のある他の人に役立つことを願っています。私はTPLDataflowが大好きで、特にsvickは、深く掘り下げるのに本当に役立ち、動機付けられました。ありがとうsvick!!!

于 2012-12-06T08:03:14.710 に答える