4

次のように機能する Rx パイプラインをつなぎ合わせようとしています。

  1. 会社に関する情報を含むプロファイルを提供する IObservable を受け取る関数を作成しました
  2. さまざまなデータ ソースにクエリを実行して、関連する可能性のある企業プロファイルをすべて並行して検索します。それを会社プロファイルの単一の IObservable にマージします。
  3. これらの潜在的に関連するプロファイルを取得したら、それらを既に観察したプロファイルと比較し、それらの関連性が 80% を超えていて、既に観察したプロファイルと同じでない場合は、一致していると見なします。
  4. これらの新しい一致するプロファイルに関連するデータを検索できるように、一致する企業をステップ 1 に戻します。

いくつかの既知の適切なプロファイルを使用してプロセスをブートストラップします。

最終的に、まだ確認されていない一致するプロファイルがなくなるため、プロセスは終了します。

これをプログラミングするのに問題があります。サブジェクトを使用して、パイプラインの最後尾がそのプロファイルをワークフローの最初に送信できるようにすると、誰も OnCompleted を呼び出さず、プロセスが終了したことを知ることはありません。代わりに再帰を使用してこれを開発すると、独自の戻り値で関数を呼び出そうとしているため、常にスタック オーバーフローが発生するようです。

プロセスが終了したと判断できる方法でこのタスクを達成する方法について、誰か助けてもらえますか?

4

1 に答える 1

8

次のようなデータフローが必要なようです。

seed profiles --> source --> get related --> output
                     ^                    |
                     |                    v
                     -<--- transform <-----

これは、一般的な問題の解決が特定の問題と同じくらい簡単または簡単な場合のように思われるので、必要な構成要素を提供する一般的な「フィードバック」関数を提案します。

編集:完了する機能を修正

IObservable<TResult> Feedback<T, TResult>(this IObservable<T> seed,
                                          Func<T, IObservable<TResult>> produce,
                                          Func<TResult, IObservable<T>> feed)
    {
        return Observable.Create<TResult>(
                obs =>
                {
                    var ret = new CompositeDisposable();
                    Action<IDisposable> partComplete = 
                        d =>
                        {
                            ret.Remove(d);
                            if (ret.Count == 0) obs.OnCompleted();
                        };
                    Action<IObservable<T>, Action<T>> ssub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };
                    Action<IObservable<TResult>, Action<TResult>> rsub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };

                    Action<T> recurse = null;
                    recurse = s =>
                              {
                                  rsub(produce(s),
                                       r => 
                                       {
                                           obs.OnNext(r);
                                           ssub(feed(r), recurse);
                                       });
                              };

                    ssub(seed, recurse);
                    return ret;
                });
    }

あなたの場合、TそしてTResult同じように見えるのでfeed、恒等関数も同じです。 produceステップ2と3を実装するために使用される関数になります。

私がこの関数をテストしたいくつかのサンプルコード:

void Main()
{
    var seed = new int[] { 1, 2, 3, 4, 5, 6 };
    var found = new HashSet<int>();
    var mults = seed.ToObservable()
                    .Feedback(i =>
                              {
                                  return Observable.Range(0, 5)
                                         .Select(r => r * i)
                                         .TakeWhile(v => v < 100)
                                         .Where(v => found.Add(v));
                              },
                              i => Observable.Return(i));

    using (var disp = mults.Dump())
    {
        Console.WriteLine("Press any key to stop");
        Console.ReadKey();
    }
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}

static IDisposable Dump<T>(this IObservable<T> source)
{
    return source.Subscribe(item => Console.WriteLine(item),
                            ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()),
                            () => Console.WriteLine("Dump completed"));
}
于 2013-03-09T04:21:32.643 に答える