Switch ステートメントを使用した Rx サブスクリプションに問題があります。
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
フローは次のとおりです。
- 一部のプロパティが変更され、 performSearchSubject.OnNext が呼び出されます
- ヒットするたびにオブザーバーを返す PerformPositionQuery() が呼び出されます。
- このオブザーバーを介して応答するサービスは、データ受信が完了すると OnNext を 2 回呼び出し、OnCompleted を 1 回呼び出します。
- メソッド DataArrivedForPositions が予想どおり 2 回呼び出される
- データ サービス内でobserver.OnCompleted()が呼び出されますが、メソッドPositionQueryCompletedは呼び出されません。
dataService のコードは次のとおりです。
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}