1

タスクを実行するためにカスタム スケジューラを使用する必要があり (これらはタスクである必要があります)、スケジューラが同期コンテキストを設定しない (つまり、収集しない、など)ObserveOn状況があります。以下は、私がそれをやった方法です。さて、これが非同期呼び出しを実行してその結果を待つのに最適な方法であるかどうかは、よくわかりません。これで大丈夫ですか、それともより堅牢または慣用的な方法がありますか?SubscribeOnSynchronizationContextScheduler

var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
    Task.Factory.StartNew(async () =>
    {
        return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);          
});

待っている必要がない場合はどうなりますか?

<編集:私がここでやっていることの具体的で単純化された例を見つけました。基本的に、私は Orleans で Rx を使用しています。上記のコードは、私がやっていることの必要最小限の図です。私は一般的にこの状況にも興味がありますが。

最終的なコード Orleans のコンテキストでは、これは少しトリッキーであることがわかりました。を使用する方法がわかりませんObserveOn。これは、私が使いたいものです。問題は、それを使用すると、Subscribeが呼び出されないことです。コード:

var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
    //... we need to set the custom scheduler here explicitly anyway.
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async () =>
    { 
       //In reality this is an asynchronous grain call. Doing the "shorthand way"
       //(and optionally using ObserveOn) would get the grain called, but not the
       //following .Subscribe. 
       return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
    Trace.WriteLine(i);
});

また、 Codeplex Orleans フォーラムの関連スレッドへのリンク。

4

2 に答える 2

4

StartNew現代のコードには反対することを強くお勧めします。ユースケースはありますが、非常にまれです。

カスタム タスク スケジューラを使用する必要がある場合は、スケジューラのラッパーから構築されたを使用ObserveOnすることをお勧めします。それは一口なので、一般的な考え方は次のとおりです。TaskPoolSchedulerTaskFactory

var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

SelectManyその後、ソース ストリーム内の各イベントが到着すると、非同期操作を開始するために使用できます。

別の、あまり理想的ではない解決策はasync void、サブスクリプションの「イベント」に使用することです。これは問題ありませんが、エラー処理に注意する必要があります。原則として、async void メソッドから例外が伝播することを許可しないでください。

オブザーバブルを TPL Dataflow ブロックにフックする 3 番目の方法があります。ブロックのようなものActionBlockはそのタスク スケジューラを指定でき、Dataflow は非同期ハンドラを自然に理解します。デフォルトでは、Dataflow ブロックは一度に 1 つの要素に処理を絞り込むことに注意してください。

于 2014-11-11T21:53:45.333 に答える
3

一般的に言えば、実行をサブスクライブする代わりに、タスクのパラメーターをタスクの実行に投影し、結果だけをサブスクライブする方が適切で慣用的です。そうすれば、さらに下流の Rx で構成できます。

たとえば、次のようなランダムなタスクが与えられた場合:

static async Task<int> DoubleAsync(int i, Random random)
{
    Console.WriteLine("Started");
    await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1));
    return i * 2;
}

次に、次のようにします。

void Main()
{
    var random = new Random();

    // stream of task parameters
    var source = Observable.Range(1, 5);

    // project the task parameters into the task execution, collect and flatten results
    source.SelectMany(i => DoubleAsync(i, random))

          // subscribe just for results, which turn up as they are done
          // gives you flexibility to continue the rx chain here
          .Subscribe(result => Console.WriteLine(result),
                    () => Console.WriteLine("All done."));
}
于 2014-11-11T21:47:09.837 に答える