問題タブ [rx-java]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - RX Observable をアクター (scala) に渡しても安全ですか?
RX Java の scala バインディングをしばらく使用しており、これを Akka Actors と組み合わせることを考えています。Akka 間でRX を渡すことが安全/可能かどうかを知りたいです。Observable
Actor
たとえば、20 までの偶数の平方を (毎秒) 出力するプログラム:
(これは疑似コードとして扱ってください。コンパイルされません)。send
あるアクターから別のアクターに Observables を送信していることに注意してください。理解したい:
- Akka と RX は Observable へのアクセスを自動的に同期しますか?
- を分散システム経由で送信することは
Observable
できません。ローカル メモリ内のオブジェクトへの参照です。しかし、それはローカルで機能しますか? - この簡単な例で、 の
subscribe
コールで作業がスケジュールされるとしProducer
ます。作業を分割して、各アクターで別々に行うことはできますか?
余談: RX と Actors を組み合わせようとするプロジェクトをいくつか見てきました。
http://jmhofer.johoop.de/?p=507および https://github.com/jmhofer/rxjava-akka
Observable
しかし、これらは単にアクター間のメッセージとして を渡すだけではないという点で異なります。彼らは最初に値を取得するために呼び出しsubscribe()
、次にこれらをアクターのメールボックスに送信し、これから新しい値を作成しますObservable
。それとも私は間違っていますか?
scala - rxjava は例外の後でも次の要素を消費します
「onNext」で例外が発生した後、さらに「onError」が呼び出された後でも、rxjavaが監視可能なシーケンスから次の要素を消費する理由を誰かが説明できますか?
ここに私のシミュレーションがあります:
その結果は次のとおりです。例外が発生した後、項目#6がシーケンスから取得されていることがわかります。
私が理解できる限り、通常のシーケンスから作成された監視可能なコレクションは「コールド」であり、現在のアイテムがオブザーバーによって正常に処理された後にのみ、次のアイテムをソース シーケンスから取得する必要があります。スレッドの問題が疑われる可能性がありますが、「コールド」は真実ではない「コールド」ではないことを意味し、同じスレッド ID が常に使用されていることも明確にわかります。
では、なぜアイテム6がシーケンスから取られるのでしょうか?!
android - Observable ベースの API と登録解除の問題
Rx-Java を使用して、Android での位置追跡用のクラスを作成しようとしています。私がまだ理解できないのは、Observable のライフサイクルを適切に処理する方法です。私が欲しいのは、最初のサブスクリプションが発生したときに位置の追跡を開始し、最後のサブスクリプションが破棄されたときに位置の追跡を停止する Observable です。これまでに達成したことは次のとおりです。
ご覧のとおりObservable#defer
、最初のクライアントがサブスクライブするときにロケーション コールバックを開始するために使用します。良いアプローチかどうかはわかりませんが、現時点で思いついたのはこれが最善です。私がまだ欠けているのは、クラスの最後のクライアントがオブザーバブルからサブスクライブを解除したときに位置情報の更新を停止する方法です。それとも、明らかではないため、Rxでは慣用的ではないものでしょうか?
私は、このユースケースはどちらかというと標準的なものであるべきだと考えています。したがって、標準的/慣用的なソリューションが必要です。知っていただければ幸いです。
scala - groupBy でネストされた Observable の型消去の処理
この(不自然な)コードを考えてみましょう:
最初の 10 がCaseOne
で、次がであるオブザーバブルを作成しますCaseTwo
。次に、それらが偶数か奇数かによってグループ化されます。したがって、グループ化されたオブザーバブルはObservable[(Long, Observable[MyCaseClass])]
、RX-Java 仕様に従って型になります。
ただし、「groupedby」オブザーバブルをサブスクライブするようになると、タイプの消去は、ネストされたオブザーバブルのシグネチャが失われることを意味します。つまり、それが CaseOne か CaseTwo か ( type になりますAny
) - コンパイラーはこれについて警告します。したがって、出力は
私の質問は、上記のシナリオで、ネストされた Observable の型消去をどのように処理するのですか?
これまでの私の唯一の回避策は、ネストされた Observable 型を識別するために使用されるキーに追加の値を含めてから、asInstance
この型に ( を使用して) キャストすることでした。しかし、これはあまりよろしくありません。
また、この例では使用していませんeven
が、問題の構造を直接反映していることにも注意してください。
java - RxJava:依存関係を持つ複数のオブザーバブルを構成し、最後にすべての結果を収集する方法は?
私は RxJava を学習しており、最初の実験として、このコードrun()
の最初のメソッドのコードを書き直して (RxJava が解決できる問題としてNetflix のブログで引用されています)、RxJava を使用して非同期性を改善しようとしています。最初の Future ( ) の結果を待ってから、残りのコードに進みます。f1.get()
f3
に依存しf1
ます。私はこれを処理する方法を見flatMap
て、トリックを行うようです:
次に、f4
依存f5
しf2
ます。私はこれを持っています:
これは奇妙になり始めます(merge
おそらく私が望むものではないでしょう...)が、最後にこれを行うことができますが、私が望むものとはまったく異なります。
それは私に与えます:
これはすべての数字ですが、残念ながら、結果は別々の呼び出しで得られるため、元のコードの最後の println を完全に置き換えることはできません。
同じ行でこれらの両方の戻り値にアクセスする方法がわかりません。ここで見逃している関数型プログラミング fu がおそらくあると思います。これどうやってするの?ありがとう。
reactive-programming - RxJava を使用して依存関係を持つ Async Observable を作成する
私はリアクティブ プログラミングが初めてで、依存関係を持つオブザーバブルの作成について混乱しています。シナリオは次のとおりです。2 つのオブザーバブルA、Bがあります。Observable Aは、 Bによって発行された値に依存します。(したがって、A は B を観察する必要があります)。AとBを構成し、 Vを放出する Observable Cを作成する方法はありますか? RxJava documentationでポインターを探しているだけです。
multithreading - RxJavaで作成されたスレッドはどの時点ですか
Observableにたくさんの変換があるとします:
への最後の呼び出しを除いて、これらの操作はすべて同期していflatMap()
ますか? それとも、サブスクライブするように指示したスレッドですべての操作が実行されますか?