3

2 つのイベント ストリームがあります。1 つはインダクタンス ループからのもので、もう 1 つは IP カメラです。車がループを越えてカメラに衝突します。イベントが互いに N ミリ秒以内にある場合 (車は常に最初にループに入る)、それらを結合したいのですが、各ストリームからの一致しないイベント (いずれかのハードウェアが故障する可能性があります) もすべて単一のストリームにマージしたいと考えています。このようなもの:

           ---> (only unmatched a's, None)
         /                                  \
stream_a (loop)                              \
         \                                    \
            --> (a, b) ---------------------------> (Maybe a, Maybe b)
         /                                    /
stream_b  (camera)                           /
         \                                  /
            --> (None, only unmatched b's)

これで確かに、古き良き Subject アンチパターンを実行することで、ハッキングすることができます。

unmatched_a = Subject()

def noop():
    pass

pending_as = [[]]

def handle_unmatched(a):
    if a in pending_as[0]:
        pending_as[0].remove(a)
        print("unmatched a!")
        unmatched_a.on_next((a, None))

def handle_a(a):
    pending_as[0].append(a)
    t = threading.Timer(some_timeout, handle_unmatched)
    t.start()
    return a

def handle_b(b):
    if len(pending_as[0]):
        a = pending_as[0].pop(0)
        return (a, b)

    else:
        print("unmatched b!")
        return (None, b)

stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)

これはかなりハックであるだけでなく、私は観察していませんが、保留中のキューをthreading.Timer. 過剰な rx 演算子を考えると、それらのいくつかの組み合わせで を使用せずにこれを実行できると確信していますがSubject、それを理解することはできません。これをどのように達成しますか?

編集

組織上および運用上の理由から、私は Python に固執することを好みますが、JavaScript rxjs の回答を取得し、それを移植するか、ノード内のスクリプト全体を書き直すことさえできます。

4

2 に答える 2

2

auditTimeと を使用して問題を解決できるはずですbuffer。このような:

function matchWithinTime(a$, b$, N) {
  const merged$ = Rx.Observable.merge(a$, b$);
  // Use auditTime to compose a closing notifier for the buffer.
  const audited$ = merged$.auditTime(N);
  // Buffer emissions within an audit and filter out empty buffers.
  return merged$
    .buffer(audited$)
    .filter(x => x.length > 0);
}

const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));

setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

b値の直後に値が続く可能性がaあり、それらを一致させたくない場合は、次のように、より具体的な監査を使用できます。

const audited$ = merged$.audit(x => x === "a" ?
  // If an `a` was received, audit upcoming values for `N` milliseconds.
  Rx.Observable.timer(N) :
  // If a `b` was received, don't audit the upcoming values.
  Rx.Observable.of(0, Rx.Scheduler.asap)
);
于 2018-05-18T00:22:50.127 に答える