一部のデータをバッファリングするために Reactive Extensions (Rx) を使用しています。ただし、このデータと非同期で何かを行う必要があるという点で問題がありますが、非同期操作が完了するまでバッファーが次のグループを通過させたくないのです。
コードを 2 つの方法で構成しようとしました (不自然な例):
public async Task processFiles<File>(IEnumerable<File> files)
{
await files.ToObservable()
.Buffer(10)
.SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
.Select(t => t.ToObservable())
.Merge()
.LastAsync();
}
public Task upload(File item)
{
return Task.Run(() => { //Stuff });
}
また
public async Task processFiles<File>(IEnumerable<File> files)
{
var buffered = files.ToObservable()
.Buffer(10);
buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));
await buffered.LastAsync();
}
public Task upload(File item)
{
return Task.Run(() => { //Stuff });
}
残念ながら、非同期操作が完了する前にバッファーが次のグループをプッシュするため、これらの方法はどちらも機能しませんでした。その意図は、バッファリングされた各グループを非同期的に実行し、その操作が完了したときにのみ、次のバッファリングされたグループに進むことです。
どんな助けでも大歓迎です。