1

新しいファイルのディレクトリをポーリングするために使用している非常に単純なクラスがあります。場所、その場所の監視を開始する時間、および再確認する間隔 (時間単位) を取得します。

public class Thing
{
  public string         Name {get; set;}
  public Uri            Uri { get; set;}
  public DateTimeOffset StartTime {get; set;}
  public double         Interval {get; set;}
}

私は Reactive Extensions を初めて使用しますが、ここでの仕事にぴったりのツールだと思います。開始時とその後のすべての間隔で、すべての面倒な作業を行う Web サービスを呼び出したいだけですpublic bool DoWork(Uri uri)

編集: DoWork は、新しいファイルをチェックして必要に応じて移動する Web サービスへの呼び出しであるため、その実行は非同期である必要があります。完了した場合は true、そうでない場合は false を返します。

これらのコレクション全体があると、事態は複雑になりThingます。for eachを作成する方法に頭を悩ませ、Observable.Timer()それらすべてに同じメソッドを呼び出させることはできません。

edit2: Observable.Timer(DateTimeOffset, Timespan) は、ここでやろうとしていることの IObservable を作成するのに最適なようです。考え?

4

3 に答える 3

3

多くのタイマーが必要ですか?20 個のコレクションがある場合、20 個のタイマーをすべて同じ時点で起動するように作成しますか? 同じスレッド/スケジューラー上ですか?

DoWorkそれとも、すべての期間で foreachを使用したいですか?

すなわち

from thing in things
from x in Observable.Interval(thing.Interval)
select DoWork(thing.Uri)

対。

Observable.Interval(interval)
.Select(_=>
    {
        foreach(var thing in Things)
        {
            DoWork(thing);
        }
    })

将来的に仕事をする方法はたくさんあります。

  • スケジューラを直接使用して、将来実行される作業をスケジュールできます。
  • Observable.Timer を使用して、将来の指定された時間に 1 つの値を生成するシーケンスを持つことができます。
  • Observable.Interval を使用して、指定された期間ごとに多くの値を生成するシーケンスを持つことができます。

したがって、これは別の質問を紹介します。ポーリング時間が 60 秒で、機能を実行する場合は 5 秒かかります。次のポーリングは 55 秒後か 60 秒後か? ここで、1 つの回答は Rx シーケンスを使用することを示し、もう 1 つの回答はおそらく定期的なスケジューリングを使用することを示しています。

次の質問は、DoWork が値を返すかどうかです。現在、そうではないようです*。この場合、最も適切なことは定期スケジューラを活用することだと思います (Rx v2 を想定)。

var things = new []{
    new Thing{Name="google", Uri = new Uri("http://google.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3},
    new Thing{Name="bing", Uri = new Uri("http://bing.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3}
};
var scheduler = Scheduler.Default;
var scheduledWork = new CompositeDisposable();

foreach (var thing in things)
{
    scheduledWork.Add( scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri)));
}

//Just showing that I can cancel this i.e. clean up my resources.
scheduler.Schedule(TimeSpan.FromSeconds(10), ()=>scheduledWork.Dispose());

これにより、それぞれの処理が定期的に (ドリフトなしで) 独自の間隔でスケジュールされ、キャンセルが提供されます。

必要に応じて、これをクエリにアップグレードできます

var scheduledWork = from thing in things
                    select scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri));

var work = new CompositeDisposable(scheduledWork);

StartTimeこれらのクエリの問題は、要件を満たしていないことです。面倒なことに、このCcheduler.SchedulePeriodicメソッドは開始オフセットを持つためのオーバーロードも提供していません。

ただし、Observable.Timerオペレーターはこれを提供します。また、ドリフトしないスケジューリング機能を内部的に活用します。クエリを再構築するにObservable.Timerは、次のようにします。

var urisToPoll = from thing in things.ToObservable()
                 from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
                 select thing;

var subscription = urisToPoll.Subscribe(t=>DoWork(t.Uri));

これで、ドリフトを回避する優れたインターフェイスが完成しました。ただし、ここでの作業は逐次的に行われると思います (多数の DoWork アクションが同時に呼び出される場合)。

*理想的には、このような副作用の記述を避けようとしますが、あなたの要件が 100% わかっているわけではありません。

編集 DoWork への呼び出しは並行して行う必要があるようです。そのため、もう少し処理を行う必要があります。理想的には DoWork を asnyc にしますが、それができない場合は、作成するまで偽装することができます。

var polling = from thing in things.ToObservable()
              from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
              from result in Observable.Start(()=>DoWork(thing.Uri))
              select result;

var subscription = polling.Subscribe(); //Ignore the bool results?
于 2013-08-08T14:41:05.680 に答える
2

うーん。あなたDoWorkが何かをする必要がある結果を生み出しますか?そう仮定します。あなたは言いませんでしたが、DoWork同期的であると仮定します。

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .Select(_ => new { thing, result = DoWork(thing.Uri) }))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

これは仮定のバージョンTask<bool> DoWorkAsync(Uri)です:

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .SelectMany(Observable.FromAsync(async () =>
       new { thing, result = await DoWorkAsync(thing.Uri) })))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

DoWorkAsyncこのバージョンは、間隔が満了するかなり前に が終了し、新しいインスタンスを起動することを前提としDoWorkAsyncているため、同じThingインスタンスに対して同時実行が発生することを防ぎません。

于 2013-08-08T03:25:00.303 に答える