0

一部の xml の要素 (プロセスの出力から読み取られるsvn log --xml ...) を列挙し、xml 要素ごとに長時間実行されるメソッドを実行する次のスニペットがあります。

var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

var xElements = xml.Descendants("path")
                   .ToObservable()
                   //.SubscribeOn(ThreadPoolScheduler.Instance) 
                   .Select(descendant => return LongRunning(descendant));
xElements
    //.SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(result => Console.WriteLine(result);

Console.ReadKey();

LongRunningメソッドは重要ではありませんが、その中で実行中のスレッドをログに記録します。1秒間実行されるとしましょう。

私の問題は、どちらの行もコメント解除しSubscribeOn()ても何の効果もないということです。への呼び出しLongRunningはシーケンシャルで、同じスレッドで 1 秒ごとに発生します (ただし、メイン (初期) スレッドとは別のスレッド)。

これはコンソール アプリケーションです。

私はRxの初心者です。私は何が欠けていますか?

編集:

Lee Campbell の回答を試した後、別の問題に気付きました。

Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);

var xElements = xml.Descendants("path").ToObservable()
    //.ObserveOn(Scheduler.CurrentThread)
    .SelectMany(descendant =>     
          Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
    .Subscribe(result => Console.WriteLine(
         "Result on: " + Thread.CurrentThread.ManagedThreadId));

[...]

string LongRunning(XElement el)
{
    Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
    DoWork();
    Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
    return "something";
}

これにより、次の出力が得られます。

Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...

必要なのは、結果を同じスレッドに「キューに入れる」方法です。それが目的だと思いましたが、上記ObserveOn()の行のコメントを外してObserveOn()も結果は変わりません。

4

1 に答える 1

8

まず、Rx は、非同期性、特に観測可能なシーケンスを制御するためのライブラリ (またはパラダイム) です。ここにあるのは、列挙可能なシーケンス (Xml 子孫) とブロッキング/同期LongRunningメソッド呼び出しです。

列挙可能なシーケンスを呼び出すことによりToObservable()、実際にはインターフェイスに準拠しているだけですが、シーケンスが実現されると (怠惰ではなく熱心に)、それについて実際に観察可能/非同期になるものは何もありません。

を呼び出すことSubscribeOnで、正しい考えが得られましたが、変換は演算子で既に行われていToObservable()ます。ToObservable(ThreadPoolScheduler.Instance)おそらく、 の遅い反復をIEnumerable他のスレッドで実行できるように呼び出すつもりでした。しかし...これは遅いイテレータではないと思うので、おそらく何も解決しません。

最も可能性が高いのは (Rx がこの種の問題に最適なツールである場合は疑わしい)、LongRunningメソッドの呼び出しをスケジュールすることです。ただし、これは選択に Asyncrony を追加する必要があることを意味します。Observable.FromAsyncこれを行う優れた方法は、またはのような Rx Factory メソッドの 1 つですObservable.Start。ただし、これによりシーケンスがIObservable<IObservable<T>>. SelectManyまたはを使用して平坦化できますMerge

これをすべて言ったので、あなたがやりたいと思うことは次のとおりです。

var proc = Process.Start(avnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

//EDIT: Added ELS to serialise results onto a single thread.
var els = new EventLoopScheduler(threadStart=>new Thread(threadStart)
    {
        IsBackground=true, 
        Name="MyEventLoopSchedulerThread"
    });

var xElements = xml.Descendants("path").ToObservable()
                .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance))
                .ObserveOn(els)
                .Subscribe(result => Console.WriteLine(result));

Console.ReadKey();
于 2013-09-25T21:43:49.480 に答える