3

大きな Observable グラフ (つまりmergegroupByjoinなどを使用して何度も構成された Observable ) があり、例外がスローされた場合、例外の発生元を特定するのが難しい場合があります。ソース ファイルのどこで Observable オペレータが呼び出されたかを確認できるかどうかを知りたいです。例はこれをより明確にするはずです。

たとえば、次IllegalStateException: Only one subscriber allowed!のスタック トレースが与えられた場合ソース ファイルのどの行番号、などが呼び出されたかを調べることができるかどうかを知りたいです。operatorMergeoperatorFilteroperatorGroupByデバッガー、print ステートメントなどを使用して、何らかの方法でこれを行うことは可能ですか?

java.lang.IllegalStateException: Only one subscriber allowed!
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:124)
        at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilS
ubscriber.java:81)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:251)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2.call(OperatorGroupBy.
java:236)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable$1.call(Observable.java:144)
        at rx.Observable$1.call(Observable.java:136)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.**OperatorMerge**$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.SingleDelayedProducer.emit(SingleDelayedProducer.java:80)

        at rx.internal.operators.SingleDelayedProducer.set(SingleDelayedProducer.java:63)
        at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservab
leList.java:93)
        at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:44)
        at rx.internal.operators.**OperatorFilter**$1.onCompleted(OperatorFilter.java:42)
        at rx.internal.operators.OperatorTakeUntilPredicate$ParentSubscriber.onNext(Operat
orTakeUntilPredicate.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
        at rx.internal.operators.**OperatorGroupBy**$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.**OperatorPublish**$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscr
ibeFromIterable.java:98)
        at rx.Subscriber.setProducer(Subscriber.java:177)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:50)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java
:33)
        at rx.Observable.unsafeSubscribe(Observable.java:7531)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMer
ge.java:215)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
85)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:1
20)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:124
)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublis
h.java:560)
        at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.
java:258)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:112)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:676
)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:5
86)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$2$2.onNext(OperatorGrou
pBy.java:286)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:1
81)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupB
y.java:340)
        at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.
java:226)
        at rx.lang.scala.Subscriber$$anon$3.onNext(Subscriber.scala:198)
...

この問題は基本的に、Observable の全体的なポイントは、実行時に a) コードを b) から分離することであるために発生します。しかし、プログラムをデバッグするのは悪夢です。上記の質問を繰り返しますが、各構成をソースコードの元の行まで追跡できるかどうかを知りたいです。

4

2 に答える 2

1

追加のデバッグ情報に関するいくつかの実験がありましたが、ライブラリ全体の実行速度が 100 倍遅くなり、放棄されました。

問題は、GroupedObservable にサブスクライブする groupBy に続く flatMap にある可能性が高く、サブスクライブできない flatMap にそれを戻すことです。GroupedObservable は 1 回しか消費できません。publish()or演算子のいずれかを使用し、replay()それに応じて関数ロジックを調整する必要があります。

于 2015-11-03T12:14:07.007 に答える