-1

C# 4.0 アプリにオブジェクトのリストがあります。このリストに学生クラスの 100 個のオブジェクトが含まれているとします。Reactive Framework で一度に 10 個のオブジェクトを並列実行する方法はありますか?

各学生オブジェクトは、約 10 秒から 15 秒ほど時間がかかるメソッドを実行します。初回は、リストから最初の 10 個の学生オブジェクトを取得し、10 個の学生オブジェクトすべてが作業を完了するのを待ってから、次の 10 個の学生オブジェクトを取得するというように、リスト内のすべてのアイテムを完了するまで続けますか?

  1. 私はList<Student>100カウントを持っています。
  2. 最初にリストから 10 個のアイテムを取得し、各オブジェクトのロングラン メソッドを並行して呼び出します。
  3. 各オブジェクトの戻り値を受け取り、UI [サブスクリプション部分] を更新します。
  4. 次のラウンドは、最初の 10 ラウンドが完了し、すべてのメモリが解放された場合にのみ開始されます。
  5. リスト内のすべてのアイテムに対して同じプロセスを繰り返します。
  6. 各プロセスでエラーをキャッチする方法??
  7. 各学生オブジェクトのリソースとその他のリソースをメモリから解放する方法は?
  8. Reactive Framework でこれらすべてを行うための最良の方法はどれですか?
4

3 に答える 3

1

このバージョンでは、一度に 10 人の生徒が常に実行されます。生徒が終わると、別の生徒が始まります。各生徒が終了したら、発生したエラーを処理してクリーンアップできます (これは、次の生徒が実行を開始する前に行われます)。

students
    .ToObservable()
    .Select(student => Observable.Defer(() => Observable.Start(() =>
        {
            // do the work for this student, then return a Tuple of the student plus any error
            try
            {
                student.DoWork();
                return { Student = student, Error = (Exception)null };
            }
            catch (Exception e)
            {
                return { Student = student, Error = e };
            }
        })))
    .Merge(10) // let 10 students be executing in parallel at all times
    .Subscribe(studentResult =>
    {
        if (studentResult.Error != null)
        {
            // handle error
        }

        studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up.
    });

次のバッチを開始する前に 10 個の最初のバッチを終了しないため、これは正確にはあなたが求めたものではありません。これにより、常に 10 個が並行して実行されます。本当に10 個のバッチが必要な場合は、コードを調整します。

于 2013-02-27T16:53:45.337 に答える
0

私には、これは TPL の問題のように思えます。保存されている既知のデータ セットがあります。負荷の高い処理を分割して並行して実行し、負荷をバッチ処理できるようにしたいと考えています。

あなたの問題のどこにも、非同期であるソース、動いているデータであるソース、またはリアクティブである必要があるコンシューマーは見当たりません。これが、代わりに TPL を使用することを提案する私の根拠です。

別のメモとして、マジック ナンバー 10 を並列処理するのはなぜですか? これはビジネス要件ですか、それともパフォーマンスを最適化するための試みですか? 通常、TaskPool がコア数と現在の負荷に基づいてクライアント CPU に最適なものを見つけられるようにすることがベスト プラクティスです。これは、デバイスとその CPU 構造 (シングル コア、マルチ コア、メニー コア、低電力/無効なコアなど) の大きなバリエーションにより、ますます重要になると思います。

LinqPad で実行できる方法の 1 つを次に示します (ただし、Rx がないことに注意してください)。

void Main()
{
    var source = new List<Item>();
    for (int i = 0; i < 100; i++){source.Add(new Item(i));}

    //Put into batches of ten, but only then pass on the item, not the temporary tuple construct.
    var batches = source.Select((item, idx) =>new {item, idx} )
                        .GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item);

    //Process one batch at a time (serially), but process the items of the batch  in parallel (concurrently).
    foreach (var batch in batches)
    {
        "Processing batch...".Dump();
        var results = batch.AsParallel().Select (item => item.Process());
        foreach (var result in results)
        {
            result.Dump();
        }
        "Processed batch.".Dump();
    }
}


public class Item
{
    private static readonly Random _rnd = new Random();
    private readonly int _id;
    public Item(int id)
    {
        _id = id;
    }

    public int Id { get {return _id;} }

    public double Process()
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        string.Format("Processing on thread:{0}", threadId).Dump(Id);
        var loopCount = _rnd.Next(10000,1000000);
        Thread.SpinWait(loopCount);
        return _rnd.NextDouble();
    }
    public override string ToString()
    {
        return string.Format("Item:{0}", _id);
    }
}

移動中のデータの問題または反応的な消費者の問題があるかどうかを知りたいのですが、説明を簡単にするために質問を「軽視」しただけです。

于 2013-03-19T10:50:31.460 に答える
0

私の試み....

var students = new List<Student>();
{....}
var cancel = students
    .ToObservable(Scheduler.Default)
    .Window(10)
    .Merge(1)
    .Subscribe(tenStudents =>
    {
        tenStudents.ObserveOn(Scheduler.Default)
            .Do(x => DoSomeWork(x))
            .ObserverOnDispatcher()
            .Do(tenStudents => UpdateUI(tenStudents))
            .Subscribe();               
    });
于 2013-02-25T15:01:18.063 に答える