3

C#、TPL、Parallel Extensions、Async CTP、Reactive Extensions で非同期操作を実行する新しい方法が多数あることを考えると、次のフェッチと処理の部分を並列化する最も簡単な方法は何だろうと考えていました。

foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}

ただし、ファイルはいつでも取得できますがProcessFile、一度に 1 つのファイルしか処理できず、順番に呼び出す必要があります。

要するに、パイプライン化された方法で取得FetchFileおよび動作する最も簡単な方法、つまり同時に発生する方法は何ですか?ProcessFile

4

4 に答える 4

1

非同期は、実際には並列を意味しません。これは単に、別の操作を待ってブロックしないことを意味します。ただし、非同期 I/O を利用して、URL をダウンロードするときにスレッドをブロックしないようにします。つまり、これを行うと、URL と同じ数のスレッドを並行してダウンロードする必要がなくなります。

var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));

基本的に、URL ごとに非同期ダウンロード タスクを作成し、タスクが完了すると、プレーン オブジェクトを同期クロックとして使用する継続を呼び出して、ProcessFile順次発生するようにします。WhenAll最後のProcessFile継続が完了するまで戻りません。

RX で明示的なロックを回避できますReplaySubject(ただし、もちろん内部でロックされます)。

var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}

ここではReplaySubject、ファイル ダウンロードのパイプラインとして a を使用します。各ダウンロードは非同期に終了し、その結果をforeachブロックのパイプラインに発行します (つまり、順次発生します)。すべてのタスクが完了すると、オブザーバブルが完了し、foreach.

于 2011-05-31T15:45:17.303 に答える
1

これがRXの方法です。この拡張機能は、URI のストリームをストリームのストリームに変換します。

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }

使用法:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();

編集:おっと、ファイル書き込みのシリアル化要件に気づいていません。この部分は、本質的に RX キューである .Concat を使用することで実行できます (もう 1 つは .Zip です)。

.StreamToFile 拡張子を付けましょう:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }

これで、Web リクエストを並列に実行できますが、それらからのファイル書き込みをシリアル化できます。

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();
于 2011-03-28T17:03:51.997 に答える
1

上の制約を考えると、ProcessFileTPL を使用して非同期的にデータを取得し、プリロードされたデータを参照するトークンをキューに入れる必要があると言えます。次に、アイテムをキューから引き出して、それらを 1 つずつ ProcessFile に渡すバックグラウンド スレッドを作成できます。これは生産者/消費者パターンです。

キューについては、スレッドセーフなキューを提供できるBlockingCollectionを見ることができます。これは、ワークロードを調整できるという優れた効果もあります。

于 2011-03-28T14:55:41.867 に答える
1

私はすべての派手なメカニズムを知っているわけではないので、おそらく昔ながらの方法でそれを行うでしょうが、それが「単純」に分類されるとは思えません。

var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}
于 2011-03-28T15:01:50.823 に答える