5

オブザーバーがobserve_on(rxcpp::observe_on_new_thread())を使用している場合、すべてのオブザーバーon_completedが呼び出されるまで待機する適切な方法は何ですか:

例えば:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

私の質問は、いくつかの変数を設定してポーリングすることなく、s1 の on_completed が完了するまで待機する方法です。

observe_on() を使用する動機は、通常、値には複数のオブザーバーがあり、各オブザーバーを同時に実行したいからです。おそらく、同じ目標を達成するためのさまざまな方法があるでしょう。私はあなたのすべての提案を受け入れます.

4

1 に答える 1

4

2 つをマージすると、単一のブロッキング サブスクライブが両方の終了を待機できるようになります。

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        s.on_next(2);
        // ...
        s.on_completed();
    };

    auto values = rxcpp::observable<>::create<int>(generator).publish();

    auto work = values.
        observe_on(rxcpp::observe_on_new_thread()).
        tap([&](int c) {
            slow_function(foo);
        }).
        finally([](){printf("s1 completed\n");}).
        as_dynamic();

    auto start = values.
        ref_count().
        finally([](){printf("s2 completed\n");}).
        as_dynamic();

    // wait for all to finish
    rxcpp::observable<>::from(work, start).
        merge(rxcpp::observe_on_new_thread()).
        as_blocking().subscribe();
}

いくつかのポイント。

マージが機能するには、ストリームが同じ型を返す必要があります。異なるタイプのストリームを組み合わせる場合は、代わりに Combine_latest を使用してください。

observable<>::from() 内のオブザーバブルの順序は重要です。開始ストリームには ref_count があるため、ジェネレーターを開始する前に次のマージが作業をサブスクライブするように、最後に呼び出す必要があります。

マージには、それを呼び出す 2 つのスレッドがあります。これには、スレッドセーフな調整を使用する必要があります。rxcpp は従量課金制です。デフォルトでは、オペレーターはすべての呼び出しが同じスレッドからのものであると想定します。複数のスレッドから呼び出しを取得するオペレーターには、オペレーターがスレッドセーフな状態管理と出力呼び出しを強制するために使用するスレッドセーフ調整を与える必要があります。

必要に応じて、同じコーディネーター インスタンスを両方に使用できます。

于 2015-09-03T07:16:59.543 に答える