0

1つのサブジェクトを介してすべての異なるイベントストリームをプロキシする必要があります。

私はこのコードを思いついた:

var mySubject,
    getObservable;

getObservable = function (subject, eventName) {
    return subject
        .asObservable()
        .filter(function (x) {
            return x.EventName === eventName;
        })
        .flatMap(function (x) {
            if (x.Type === 'onNext') {
                return Rx.Observable.return(x.Data);
            }

            if (x.Type === 'onError') {
                return Rx.Observable.throw(x.Data);
            }

            return Rx.Observable.empty();
        });
};

mySubject = new Rx.Subject();

getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('foo onNext ' + x); 
    }, function(x){ 
        console.log('foo onError ' + x); 
    }, function(){ 
        console.log('foo onComplete');
    });

getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('bar onNext ' + x); 
    }, function(x){ 
        console.log('bar onError ' + x); 
    }, function(){ 
        console.log('bar onComplete');
    });

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});

出力を得ました:

foo onNext 5

bar onNext 5
bar onError Error message

期待される出力:

foo onNext 5  
foo onCompleted

bar onNext 5
bar onError Error message

イベントのbar場合、それは魅力のように機能します。onNextエラーが発生するとすぐにonError関数が呼び出され、イベントストリームが終了します。しかし、私はそれをのために動作させることができませんonComplete

完全な通知が発生するたびに、それが呼び出されますが、サブスクライバーハンドラーが呼び出されるRx.Observable.empty()ことはありません。onComplete代わりに、onNextハンドラーを呼び出します。

4

2 に答える 2

1

この関数は、を介して送信されたイベントgetObservableをサブスクライブするオブザーバブルを返します。eventNamesubject

let getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }

                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }

                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }

                return x;
            })
            .subscribe();
    });
};

これは、元の質問のデータを使用した実際の例です。

var mySubject,
    getObservable;

getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }

                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }
                
                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }
                
                return x;
            })
            .subscribe();
    });
};

mySubject = new Rx.Subject();

getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('SomethingHappened onNext ' + x); 
    }, function(x){ 
        console.log('SomethingHappened onError ' + x); 
    }, function(){ 
        console.log('SomethingHappened onComplete');
    });


getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('DataUpdated onNext ' + x); 
    }, function(x){ 
        console.log('DataUpdated onError ' + x); 
    }, function(){ 
        console.log('DataUpdated onComplete');
    });

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

于 2012-03-06T10:49:43.803 に答える
0

.NET Observable.SelectManyは、Observable.Mergeを使用して、ストリームを1つの複合オブザーバブルにマージします。IMHO Observable.Mergeは、マージされたオブザーバブルのいずれかが完了した場合にのみ完了します。

例:http ://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

それが問題の理由かもしれません。

于 2012-03-06T15:26:20.567 に答える