2

私はTPL Dataflowを初めて使用し、個々のパイプラインを介してメッセージ メッセージの順序を維持しながら、均等に分散された並列処理のためにソース メッセージのリストを分割できる構造を探しています。これを達成するために使用できる DataFlow API 内の特定のブロックまたは概念はありますか?それとも、既存のブロック間にグルー コードまたはカスタム ブロックを提供することの方が重要ですか?

Akka.NETに精通している方向けに、 ConsistentHashing ルーターに似た機能を探しています。これにより、単一のルーターにメッセージを送信し、これらのメッセージを個々のルートに転送して処理することができます。

同期の例:

var count = 100000;
var processingGroups = 5;
var source = Enumerable.Range(1, count);

// Distribute source elements consistently and evenly into a specified set of groups (ex. 5) so that.
var distributed = source.GroupBy(s => s % processingGroups);

// Within each of the 5 processing groups go through each item and add 1 to it
var transformed = distributed.Select(d => d.Select(i => i + 3).ToArray());

List<int[]> result = transformed.ToList();
Check.That(result.Count).IsEqualTo(processingGroups);
for (int i = 0; i < result.Count; i++)
{
    var outputGroup = result[i];

    var expectedRange = Enumerable.Range(i + 1, count/processingGroups).Select((e, index) => e + (index * (processingGroups - 1)) + 3);
    Check.That(outputGroup).ContainsExactly(expectedRange);
}
4

2 に答える 2

1

一般に、ConsistentHashing ルーターを使用している可能性があるため、探しているものが Dataflow で事前に作成されているとは思いません。ただし、フローしたいデータに id を追加することで、それらを任意の順序で並行して処理し、処理が終了したときに並べ替えることができます。

public class Message {
        public int MessageId { get; set; }
        public int GroupId { get; set; }        
        public int Value { get; set; }
    }

    public class MessageProcessing
    {
        public void abc() {
            var count = 10000;
            var groups = 5;
            var source = Enumerable.Range(0, count);

            //buffer all input
            var buffer = new BufferBlock<IEnumerable<int>>();

            //split each input enumerable into processing groups
            var messsageProducer = new TransformManyBlock<IEnumerable<int>, Message>(ints => 
            ints.Select((i, index) => new Message() { MessageId = index, GroupId = index % groups, Value = i }).ToList());

            //process each message, one action block may process any group id in any order
            var processMessage = new TransformBlock<Message, Message>(msg => 
            {
                msg.Value++;
                return msg;
            }, new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = groups
            });

            //output of processed message values
            int[] output = new int[count];

            //insert messages into array in the order the started in
            var regroup = new ActionBlock<Message>(msg => output[msg.MessageId] = msg.Value, 
                new ExecutionDataflowBlockOptions() {
                    MaxDegreeOfParallelism = 1
                });
        }        

    }

この例では、メッセージの GroupId は使用されていませんが、メッセージのグループを調整するためのより完全な例で使用できます。また、出力配列を List に変更し、整数の列挙型がバッファ ブロックにポストされるたびに、対応するリスト要素を設定することで、バッファ ブロックへのフォローアップ ポストを処理できます。正確な用途によっては、出力の複数のユーザーをサポートする必要がある場合があり、これはフローに折り返すことができます。

于 2016-12-25T18:05:53.513 に答える