2 つのイベント ストリームがあります。1 つはインダクタンス ループからのもので、もう 1 つは IP カメラです。車がループを越えてカメラに衝突します。イベントが互いに N ミリ秒以内にある場合 (車は常に最初にループに入る)、それらを結合したいのですが、各ストリームからの一致しないイベント (いずれかのハードウェアが故障する可能性があります) もすべて単一のストリームにマージしたいと考えています。このようなもの:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
これで確かに、古き良き Subject アンチパターンを実行することで、ハッキングすることができます。
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
これはかなりハックであるだけでなく、私は観察していませんが、保留中のキューをthreading.Timer
. 過剰な rx 演算子を考えると、それらのいくつかの組み合わせで を使用せずにこれを実行できると確信していますがSubject
、それを理解することはできません。これをどのように達成しますか?
編集
組織上および運用上の理由から、私は Python に固執することを好みますが、JavaScript rxjs の回答を取得し、それを移植するか、ノード内のスクリプト全体を書き直すことさえできます。