これがRXの方法です。この拡張機能は、URI のストリームをストリームのストリームに変換します。
public static IObservable<Stream> RequestToStream(this IObservable<string> source,
TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
wc.EndGetResponse)()
.Timeout(timeout, Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}
使用法:
new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();
編集:おっと、ファイル書き込みのシリアル化要件に気づいていません。この部分は、本質的に RX キューである .Concat を使用することで実行できます (もう 1 つは .Zip です)。
.StreamToFile 拡張子を付けましょう:
public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}
これで、Web リクエストを並列に実行できますが、それらからのファイル書き込みをシリアル化できます。
new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();