配列を保持するストリームがあり、その各要素には ID があります。これを ID ごとにストリームに分割する必要があります。これは、ソース ストリームが ID を保持しなくなったときに完了します。
たとえば、これらの 3 つの値を持つ入力ストリーム シーケンス
[{a:1}, {b:1}] [{a:2}, {b:2}, {c:1}] [{b:3}, {c:2}]
3 つのストリームを返す必要があります
a -> 1 2 |
b -> 1 2 3
c -> 1 2
id がなくなったので a は 3 番目の値で完了し、id が現れたので c は 2 番目の値で作成されました。
私はgroupByUntilを試していますが、少し似ています
var input = foo.share();
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)
そのため、ID でグループ化し、入力ストリームに ID がなくなったらグループを破棄します。これは機能しているように見えますが、入力の 2 つの使用法は私には奇妙に見えます。たとえば、単一のストリームを使用して groupByUntil の入力とグループの破棄を制御すると、奇妙な順序の依存関係が生じる可能性があります。
より良い方法はありますか?
アップデート
実際、ここには奇妙なタイミングの問題があります。fromArray はデフォルトで currentThread スケジューラを使用します。これにより、その配列からのイベントが入力からのイベントとインターリーブされます。グループの破棄条件は、間違った時点で (前の入力からのグループが処理される前に) 評価されます。
考えられる回避策は、fromArray(.., rx.Scheduler.immediate) を実行することです。これにより、グループ化されたイベントが入力と同期されます。