0

RX(Reactive extensions) を使用してイベント アグリゲーターを実装しようとしていますが、使用しているコードはここにありますが、 subject.AddDisposableメソッドがありません。誰でも私を助けることができますか?おそらく古いバージョンなので、Rxの新しいバージョンではこのメソッドは削除されましたか? これが当てはまる場合、これを実装する正しい方法は何ですか?

 if (_observablesByTypeKey.ContainsKey(key))
            {
                Tuple<object, object> tuple = _observablesByTypeKey[key];
                stream = (IObservable<T>)tuple.Item2;
            }
            else
            {
                Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                var removeEventStreamFromCache = Disposable.Create(
                    () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                var tuple = new Tuple<object, object>(subject, stream);
                _observablesByTypeKey.Add(key, tuple);
4

1 に答える 1

2

交換できます

stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

stream = Observable.Create(observer => 
  new CompositeDisposable(
    subject.Subscribe(observer),
    removeEventStreamFromCache));
stream.Publish().RefCount();
于 2012-04-24T23:53:03.130 に答える