4

非同期で開始および終了するストリーム (オブザーバブル) を結合しようとしています。

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

必要なもの: オーディオ ストリームを一緒に追加します。それらはオーディオの「チャンク」のストリームですが、ここでは整数で表現します。最初のクリップが再生されます。

-1----1----1----1---|->

そして、少し後に 2 番目のものが開始されます。

     -2----2--|->

それらを合計で結合した結果は次のようになります。

-1----3----3----1---|->

ただし、圧縮されたストリームのいずれかが終了すると、標準の zip は完了します。ストリームの 1 つが終了しても、この optional_zip を続行したいと考えています。Rx でこれを行う方法はありますか、または既存の Zip を変更して自分で実装する必要がありますか?

注: 私は RxPy を使用していますが、ここのコミュニティは小さく、Rx オペレーターは言語間で非常に普遍的であるように思われるため、rx-java および rx-js としてもタグ付けしました。

4

2 に答える 2

2

私はこの問題を 2 つの部分に分けて取り組みます。まず、 を取り、配列に「アクティブな」(つまり、完全ではない) オブザーバブルのみが含まれるObservable<Observable<T>>を生成するものが必要です。Observable<Observable<T>[]>新しい要素が外側のオブザーバブルに追加され、内側のオブザーバブルの 1 つが完了するたびに、適切なオブザーバブルを含む新しい配列が発行されます。これは基本的に、プライマリ ストリームの「スキャン」削減です。

それができるものを手に入れたら、 flatMapLatest と zip を使用して必要なものを取得できます。

最初の部分での私の基本的な試みは次のとおりです。

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];

    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });

        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });

    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

そこから、次のように圧縮して平らにします。

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

アクティブなストリームが 0 の場合は何も生成されず、1 つのアクティブなストリームが独自の要素を生成し、2 つ以上のストリームがすべて一緒に圧縮される (これらはすべてマップ アプリケーションに反映されます) という仮定を立てました。

私の(確かにかなり限定された)テストでは、この組み合わせで、あなたが求めていた結果が得られました。

ところで、素晴らしい質問です。問題の最初の部分を解決するものは見たことがありません (私は決して Rx の専門家ではありませんが、既にこれを行っていることを誰かが知っている場合は、詳細を投稿してください)。

于 2016-03-10T02:50:12.507 に答える
1

だから私はあなたが必要とすることのほとんどを行うと思ういくつかのコードを動作させました。基本的に、私は のzipAndContinueように動作する関数を作成しましたzipが、基になるストリームの一部に発行するデータがある限り、アイテムを発行し続けます。この関数は、コールド オブザーバブルでのみ [簡単に] テストされています。

また、修正/拡張/編集は大歓迎です。

function zipAndContinue() {
    // Augment each observable so it ends with null
    const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
    const combined$ = Rx.Observable.combineLatest(observables);

    // The first item from the combined stream is our first 'zipped' item
    const first$ = combined$.first();

    // We calculate subsequent 'zipped' item by only grabbing
    // the items from the buffer that have all of the required updated
    // items (remember, combineLatest emits each time any of the streams
    // updates).
    const subsequent$ = combined$
        .skip(1)
        .bufferWithCount(arguments.length)
        .flatMap(zipped)
        .filter(xs => !xs.every(x => x === null));

    // We return the concatenation of these two streams
    return first$.concat(subsequent$)
}

そして、使用されるユーティリティ関数は次のとおりです。

function endWithNull(observable) {
    return Rx.Observable.create(observer => {
        return observable.subscribe({
            onNext: x => observer.onNext(x),
            onError: x => observer.onError(x),
            onCompleted: () => {
                observer.onNext(null);
                observer.onCompleted();
            }
        })
    })
}

function zipped(xs) {
    const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);

    // The number of streams that are still emitting
    const stillEmitting = Math.max.apply(null, nonNullCounts);

    if (stillEmitting === 0) {
        return Rx.Observable.empty();
    }

    // Skip any intermittent results
    return Rx.Observable.from(xs).skip(stillEmitting - 1);
}

使用例は次のとおりです。

const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);

zipAndContinue(one$, two$, three$)
    .subscribe(x => console.log(x));

// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]

そして、これが js-fiddle です ([実行] をクリックしてコンソールを開くことができます): https://jsfiddle.net/ptx4g6wd/

于 2016-03-09T23:46:03.357 に答える