IntroToRx Web サイトを読むと、Observable.Create ヘルパー メソッドを優先して Subject の使用を思いとどまらせます。
ご覧のとおり、OnNext メソッドは、Observer オブジェクトにアクセスできる唯一の部分であるため、subscribe メソッド内でのみ呼び出すことができます。
創造された後に新しい価値をプッシュしたい場合はどうすればよいですか? サブジェクトを使用することを「強制」されていますか?
IntroToRx Web サイトを読むと、Observable.Create ヘルパー メソッドを優先して Subject の使用を思いとどまらせます。
ご覧のとおり、OnNext メソッドは、Observer オブジェクトにアクセスできる唯一の部分であるため、subscribe メソッド内でのみ呼び出すことができます。
創造された後に新しい価値をプッシュしたい場合はどうすればよいですか? サブジェクトを使用することを「強制」されていますか?
Rxを探索しているだけの場合は、それを試してみてください。Subjectを使用し、ナッツを使って、それらがどのように機能するかを確認し、自分の長所と短所を見つけてから、ここに戻って、Subjectが嫌われる理由についての質問を読んでください。
サブジェクトは、実際のソース条件を複製する必要なしに、アイデアと複雑なRxシナリオを「迅速にブートストラップ」するはるかに簡単な方法を提供します。
とは言うものの、それらは、ステートレスな一連の操作であると思われるものに状態を注入するので、それらに依存しないように注意してください。
つまり、要約すると、rxがどのように機能するか、またはクエリXを作成する方法をテスト/学習するシーケンスを生成しようとしている場合は、それらのサブジェクトを使用します。クエリ内でそれらを使用していることに気付いた場合は、より良い方法がある可能性があります。
編集:私が何かを逃したことに気づきました:
また、作成後にストリームイベントを発生させる別の方法があるかどうかを尋ねます...答えは「はい」です。定義した古いIObservableベースのオブジェクトを返すCreate、Return、Generateを介してストリームを宣言できます。これにより、イベントを挿入するメソッドを公開することもできます。または、共有リストをチェックするスレッドをスピンするラムダがあります。はリターンストリームにルーティングされます....私が言っているのは、可能性は無限大だということだと思います。Observableで宣言された「イベントのシーケンスを作成する」メソッドのようなものが12個あります-それらすべてを試してください!
編集2:
例?Observable.Create
確かに、本当に非効率的なものを模倣して何かを一緒に投げましょうSubject
:
var running = true;
var values = new ConcurrentQueue<int>();
var query = Observable.Create<int>(obs =>
{
var body = Task.Factory.StartNew(()=>
{
while(running)
{
int nextValue;
if(values.TryDequeue(out nextValue))
{
obs.OnNext(nextValue);
}
Thread.Yield();
}
});
return Disposable.Create(() =>
{
try
{
running = false;
body.Wait();
obs.OnCompleted();
}
catch(Exception ex)
{
obs.OnError(ex);
}
});
});
using(query.Subscribe(Console.WriteLine))
{
values.Enqueue(1);
values.Enqueue(2);
values.Enqueue(3);
values.Enqueue(4);
Console.ReadLine();
}
これは、非常に汚いサンプルコードであることに注意してください。:)
外部デバイスからデータを受信している場合、エラーを通知する意図はありませんIObserver.OnError
(ストリームが無限である、および/または通信に関する問題がメッセージ自体にあると仮定します)、ある程度の速度でポーリングします。サブジェクトの唯一の問題は、おそらく、誰かがサブスクライブする前であっても、このデバイスのポーリングを開始するでしょう (ただし、追加の利点は、状態の処理が非常に明白であることです。1 つのオブジェクトを作成し、COM ポートを開き、値を通信して発行します)
Observable.Create
またはを使用Observable.Timer/Interval
する方が良いかもしれませんが、怠惰が主な理由です。とにかく状態を管理します。また、おそらく Publish().RefCount() を使用して、2 番目のサブスクリプションがポートを開かないようにする必要があります。
それはあなたがしようとしていることに依存します。サブジェクトの場合もありますが、最初に Rx を使い始めたときに考えられるほど多くはありません。
新しいデータはどのように配列に入りますか? 別のイベントからでしょうか?おそらく、通信フレームワークからのメッセージでしょうか? 多分ファイルをポーリングしますか?
これらの回答に応じて、通常、何らかのイベント ソースが既にあり、別のパターン (イベント、ポーリング、コールバックなど) から Rx に変換しているだけであることがわかります。
また、Observable.Create を使用する必要はありません。Observable.Timer/Interval を使用してポーリング シーケンスを設定したり、Observable.FromEventPattern を使用して既存のイベントを利用したり、Observable.Start を 1 回限りの非同期タスク スタイルの計算に使用したりできます。
Rx (または Linq でさえも) は非常に抽象的である可能性があるため、抽象的な質問をすると、非常に幅広い答えにつながることがよくあります。解決しようとしている問題を示すと、より適切な回答が得られる可能性があります。