2

私はrxcppを使用するのが初めてで、次のシナリオで何かを機能させようとしています:

別のソースからコマンドを取得するデータ ソースが 1 つあります。私が書いているコードは、これらのコマンドを rxcpp オブザーバブルに取得します。一定時間内にコマンドが受信されない場合、onNext の代わりにサブスクライバーの onError 関数が実行されるという特別な条件がありますが、タイムアウトは最初のコマンドを受信する前にのみ発生する可能性があります。最初のコマンドを受信した後、次のコマンドを受信するまでにどれだけ時間がかかっても、タイムアウトは発生しません。

私は次のようなものでこれを達成しようとしています:

auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
                             rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
    std::cout << "TIMED OUT!" << std::endl;
    throw std::runtime_error("timeout");
    return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));

私が抱えている問題は、タイムアウトが発生するずっと前にコマンドが挿入されたとしても、コマンドが受信される前にタイムアウトが発生することです。1000 ミリ秒から 5000 ミリ秒までのタイムアウトを試しましたが、違いはありません。ただし、タイムアウト コードを削除すると、コマンドはすぐに受信されます。ただし、rxcpp でスケジューラを使用する方法を単に誤解している可能性が高いと思われるので、これをどのように達成できるか疑問に思っています。

4

2 に答える 2

2

簡単な createCommandSource を作成しました。これは私のために働いた:

#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::util;

using namespace std;

struct command_type {};

int main()
{
    auto eventloop = rxcpp::observe_on_event_loop();
    auto createCommandSource = [=]() {
        return rxcpp::observable<>::interval(std::chrono::seconds(1), eventloop).map([](long) {return command_type(); });
    };
    auto timeout = rxcpp::observable<>::timer(eventloop.now() + std::chrono::seconds(2), eventloop).map([](long ) // Note, converts the value type of the timer observable and converts timeouts to errors
    {
        std::cout << "TIMED OUT!" << std::endl;
        throw std::runtime_error("timeout");
        return command_type();
    });
    auto commands = timeout.amb(eventloop, createCommandSource().take(5));

    commands
        .as_blocking().subscribe(
        [](command_type) {printf("command\n"); },
        [](std::exception_ptr) {printf("execption\n"); });

    std::this_thread::sleep_for(std::chrono::seconds(2));

    return 0;
}
于 2015-09-26T05:30:43.073 に答える