1

TPL を使用して、複数の IO ソース (「スレッドレス」タスク) から結果を収集し、ソースごとにスレッド ベースのタスクを生成して監視することなく、それぞれのソースからの結果をシーケンスにマージするにはどうすればよいですか? 1 つのスレッドからソースをポーリングしても安全でしょうか?

while (true)
{
    try
    {
        IEnumerable<UdpClient> readyChannels = 
            from channel in channels
            where channel.Available > 0
            select channel;

        foreach( UdpClient channel in readyChannels)
        {
           var result = await channel.ReceiveAsync();
           //do something with result like post to dataflow block.
        }
    }
    catch (Exception e)
    {
        throw (e);
    }
    ...

そのようなものはどうですか?

4

1 に答える 1

1

ここにいくつかのオプションがあります:

への呼び出しを起動したい場合はReceiveAsync()、結果で何かを行うように設定し(たとえば、あなたが言ったようにデータフローブロックに送信します)、それらを忘れて、次を使用できますContinueWith()

foreach (var channel in readyChannels)
{
   channel.ReceiveAsync().ContinueWith(task => 
   {
       var result = task.Result;
       //do something with result like post to dataflow block.
   }
}

これの欠点の 1 つは、各継続で例外を処理する必要があることです。

おそらくより良いアプローチはOrderByCompletion()、Stephen Cleary の AsyncEx から使用することです。このようにして、すべての読み取りを一度に開始し、完了時に処理することができます。

var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion();

foreach (var task in tasks)
{
   var result = await task;
   //do something with result like post to dataflow block.
}

たとえば、並列処理を制限したい場合に役立つさらに別のオプションは、次を使用することTransformBlockです。

var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
    c => c.ReceiveAsync(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
foreach (var channel in readyChannels)
    receiveBlock.Post(channel);
receiveBlock.Complete();

// set up processing here

await receiveBlock.Completion;

結果を別のブロックに送信する場合、上記のコメントに記載されている処理は、単純にそれらをリンクすることで構成されます。

receiveBlock.LinkTo(anotherBlock);

上記のすべてのケースで、何かを監視するためにスレッドがブロックされることはありません。しかし、呼び出して結果を処理するコードは、どこかReceiveAsync()で実行する必要があります。

于 2013-01-22T14:50:09.170 に答える