0

いくつかの Reactive Extension クエリを StreamInsight に移植する作業を行っていますが、ウィンドウ クエリが重複しているという問題が発生しました。

StreamInsight サーバーにソース セットアップがあり、次のようなオーバーラップ ウィンドウ クエリを作成しようとしています。

var source = streamInsightServer.GetObservable<EventPattern<MyEventArg>>("EventSource");

var query = source.Window(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250));

ソースはどこにありIQbservable<EventPattern<MyEventArg>>、クエリは次にIQbservable<IObserverable<EventPattern<MyEventArg>>>

Reactive では、オブザーバーは次のように作成されました。

_observer = query.Subscribe(evts =>
            {
                evts.Count().Subscribe(c =>
                {
                    //push output here
                });
            });

オブザーバーをアタッチして、StreamInsight から同等の出力を取得するにはどうすればよいですか?

4

2 に答える 2

0

目的がデータ ソースの共有である場合は、 を使用する必要があります.With()

したがって、プロセスは次のようになります。

var process = query.Bind(innerSink).With(query.Bind(sink).Run("processBindingName");

プロセスが完了したとき、またはアプリケーションをシャットダウンしたいときは、 を呼び出すだけ.Dispose()です。

ウィンドウ処理に関する限り、次のように HoppingWindow を使用する必要があります。

var query = from win in source.HoppingWindow(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250))
select win.Count();
于 2015-02-19T01:24:50.820 に答える
0

さて、次のように、Rx サブスクリプションと同じ出力を実現する 2 つのシンクを作成できました。

var query = source.Window(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(250));
query.Deploy("windowQuery");
var innerSink = applicationStatus.DefineObserver(() => Observer.Create<int>(c => /*output here*/));
innerSink.Deploy("innerSink");
var sink = applicationStatus.DefineObserver(() => Observer.Create<IObservable<EventPattern<MyEventArg>>>(e => e.Count().AsQbservable().Bind(innerSink).Run().Void()));
sink.Deploy("mySink");
ProcessBinding = query.Bind(sink).Run("processBindingName");

.Void()このメソッドは単に拡張メソッドであり、何もせずに void を返すことに注意してください。これにより、必要なシグネチャと一致するように式Actionが a ではなくan として解釈されます。Funcまた、内側のシンクを実行することによって作成されたすべてのバインディングは破棄されず、構築されません。これらは、StreamInsight イベント フロー デバッガーで確認できます。

これは同じ結果を達成しますが、拡張メソッドのハックと内部プロセスバインディングを破棄しないため、適切なソリューションとして適格かどうかはわかりません。これらの問題なしでこれを別の方法で書く方法を誰かが知っているかどうかまだ探しています!

于 2015-02-18T14:22:35.523 に答える