3

TPL DataFlow を使用してサンプル プロデューサー コンシューマー パターンを作成しました。ここでいくつかの基本的な質問があります。

  1. コンシューマーは、すべてのアイテムがプロデューサーからポストされた後にのみアクティブになります。非同期とは、生産タスクと消費タスクの両方を並行して実行できることを意味します。

  2. コンシューマーにスリープ時間を与えて、他のデータ項目をブロックしているかどうかを確認します。順次実行されており、並列処理が行われていないようです。

ここで何か間違ったことをしていますか?

class AscDataBlocks
{
    public Int64 start;
    public Int64 End;
    //public string ThreadName;
    public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform) 
    // This is using TPL DataBlock producer consumer pattern.
    {
        for (var i = start; i < End; i++)
        {
            Console.WriteLine("Postingasc : {0}", i);
            targetAscTransform.Post(i);
        }
    }

    public void ProcessDataBuffer(Int64 ascDataSet)
    {
        if (ascDataSet == 5) 
        // Testing if this will delay all the other data processing
            Thread.Sleep(5000);
        else
            Thread.Sleep(500);                
        Console.WriteLine(ascDataSet);
    }

    // Demonstrates the consumption end of the producer and consumer pattern. 
    public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
    {
        // Initialize a counter to track the number of bytes that are processed. 
        int status = 0;

        // Read from the source buffer until the source buffer has no  
        // available output data. 
        while (await source.OutputAvailableAsync())
        {
            Int64 data;
            source.TryReceive(out data);
            ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
            // Increment the count of bytes received.
            status = 1;
        }
        return status;
    }
}

static void Main(string[] args)
{
    AscDataBlocks ascb;
    BufferBlock<Int64> ascbuffer;
    System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
    CancellationToken ct = new CancellationToken();
    CancellationTokenSource cts = new CancellationTokenSource();
    ascb = new AscDataBlocks();
    ascb.start = 1;
    ascb.End = 100;           
    try
    {
        ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            CancellationToken = ct
        });// Initiallize the buffer bloack
        ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
        //Post source data to the dataflow block.
        ascb.AscBufferProducer(ascbuffer);
        ascProcessingconsumer.Wait();
    }
    catch (Exception ex)
    {
        //foreach (var v in ex.InnerExceptions)
        //    Console.WriteLine("msg: " + v.Message);                
    }            
}  
4

1 に答える 1

4

コンシューマーは、すべてのアイテムがプロデューサーからポストされた後にのみアクティブになります。非同期とは、生産タスクと消費タスクの両方を並行して実行できることを意味します。

これは、消費者が開始する前に、すべてのアイテムを非常に迅速に投稿するために発生します。を追加Thread.Sleep(100)すると、実際に並行して動作することがわかります。

コンシューマーにスリープ時間を与えて、他のデータ項目をブロックしているかどうかを確認します。順次実行されており、並列処理が行われていないようです。

TPL データフローは魔法ではありません。コードを変更して並列実行することはありません。1 回呼び出すのはあなたなAscTransConsumerAsync()ので、実際には 1 回しか実行されないことに驚かないでください。

TDF は並列処理をサポートしていますが、実際に処理コードを実行させる必要があります。これを行うには、実行ブロックの 1 つを使用します。あなたの場合、ActionBlock適切なようです。

それを使用する場合は、ブロックを設定して並列実行するように構成できますMaxDegreeOfParallelism。もちろん、これを行うには、処理デリゲートがスレッド セーフであることを確認する必要があります。

これで、AscTransConsumerAsync()次のようになります。

public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
    // counter to track the number of items that are processed
    Int64 count = 0;

    var actionBlock = new ActionBlock<Int64>(
        data =>
        {
            ProcessDataBuffer(data);
            // count has to be accessed in a thread-safe manner
            // be careful about using Interlocked,
            // for more complicated computations, locking might be more appropriate
            Interlocked.Increment(ref count);
        },
        // some small constant might be better than Unbounded, depedning on circumstances
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this assumes source will be completed when done,
    // you need to call ascbuffer.Complete() after AscBufferProducer() for this
    await actionBlock.Completion;

    return count;
}
于 2013-01-10T13:19:40.423 に答える