0

Rxで最初の一歩を踏み出すと、ここで立ち往生しています。

public class DisposableResourceDemo : IDisposable
{
    public DisposableResourceDemo() {
        Console.WriteLine("DisposableResourceDemo constructor.");
    }

    public void Dispose() {
        Console.WriteLine("DisposableResourceDemo.Dispose()");
    }

    public void SideEffect() {
        Console.WriteLine("DisposableResourceDemo.SideEffect()");
    }
}

[Test]
public void ShowBehaviourOfRxUsing()
{
    var test = Observable.Using(() =>
        {
             // This should happen exactly once, independent of number of subscriptions,
             // object should be disposed on last subscription disposal or OnCompleted call 
                return new DisposableResourceDemo();
        },
        (dr) =>
        {
            return Observable.Create<string>(
                (IObserver<string> observer) =>
                {
                    dr.SideEffect();
                    var dummySource = Observable.Return<string>("Some Text");

                    return dummySource.Subscribe(observer);
                });
        }).Publish().RefCount();


    Console.WriteLine("before 1st subscription.");
    test.Subscribe(Console.WriteLine, () => Console.WriteLine("OnCompleted in 1st."));
    Console.WriteLine("before 2nd subscription.");
    test.Subscribe(Console.WriteLine, () => Console.WriteLine("OnCompleted in 2nd."));
}

驚いたことに、上記のコードは

before 1st subscription.
DisposableResourceDemo constructor.
DisposableResourceDemo.SideEffect()
Some Text
OnCompleted in 1st.
DisposableResourceDemo.Dispose()
before 2nd subscription.
--> [happy with missing "Some Text" here]
OnCompleted in 2nd.
--> [unhappy with second instantiation here] 
DisposableResourceDemo constructor.
DisposableResourceDemo.SideEffect()
DisposableResourceDemo.Dispose()

両方のサブスクリプションの後にConnect()を手動で呼び出すことは、ここで必要なことではないことに注意してください。ただし、出力は期待どおりです。

4

1 に答える 1

0

I am not totally sure what you are trying to achieve here. It seems that you want to share the observable sequence and its related resources. So the standard ways to do this is with the ConnectableObservable types that you get from .Replay() and .Publish() etc

You say you dont want to use .Connect() and instead you use .RefCount() which is very common. However, your sequence completes. You also are using the Extension method Subscribe(...) which will internally create an Auto detaching observer, i.e. when the sequence completes, it will disconnect.

So my question is, should the internal sequence actually complete? If the answer is yes, then why would the 2nd subscription get the OnComplete notification...it has happened already, it is in the past. Maybe you do want to replay the OnComplete, in which case maybe .Replay(1) is what you want. If the answer is no, then you can easily fix this by putting a Concat(Observable.Never<string>()) either before the .Publish() or after the Observable.Return.

于 2012-07-25T10:04:42.440 に答える