3

配列を保持するストリームがあり、その各要素には ID があります。これを ID ごとにストリームに分割する必要があります。これは、ソース ストリームが ID を保持しなくなったときに完了します。

たとえば、これらの 3 つの値を持つ入力ストリーム シーケンス

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

3 つのストリームを返す必要があります

a -> 1 2 |
b -> 1 2 3
c ->   1 2

id がなくなったので a は 3 番目の値で完了し、id が現れたので c は 2 番目の値で作成されました。

私はgroupByUntilを試していますが、少し似ています

 var input = foo.share();              
 var output = input.selectMany(function (s) {
                        return rx.Observable.fromArray(s);
                }).groupByUntil(
                        function (s) { return s.keys()[0]; },
                        null,
                        function (g) { return input.filter(
                                function (s) { return !findkey(s, g.key); }
                        ); }
                )

そのため、ID でグループ化し、入力ストリームに ID がなくなったらグループを破棄します。これは機能しているように見えますが、入力の 2 つの使用法は私には奇妙に見えます。たとえば、単一のストリームを使用して groupByUntil の入力とグループの破棄を制御すると、奇妙な順序の依存関係が生じる可能性があります。

より良い方法はありますか?

アップデート

実際、ここには奇妙なタイミングの問題があります。fromArray はデフォルトで currentThread スケジューラを使用します。これにより、その配列からのイベントが入力からのイベントとインターリーブされます。グループの破棄条件は、間違った時点で (前の入力からのグループが処理される前に) 評価されます。

考えられる回避策は、fromArray(.., rx.Scheduler.immediate) を実行することです。これにより、グループ化されたイベントが入力と同期されます。

4

1 に答える 1

1

ええ、私が考えることができる唯一の選択肢は、状態を自分で管理することです。私はそれがより良いことを知りません。

var d = Object.create(null);
var output = input
    .flatMap(function (s) {
        // end completed groups
        Object
            .keys(d)
            .filter(function (k) { return !findKey(s, k); })
            .forEach(function (k) {
                d[k].onNext(1);
                d[k].onCompleted();
                delete d[k];
            });
        return Rx.Observable.fromArray(s);
    })
    .groupByUntil(
        function (s) { return s.keys()[0]; },
        null,
        function (g) { return d[g.key] = new Rx.AsyncSubject(); });
于 2014-02-19T00:44:57.433 に答える