一定数のメッセージを非同期で取得できると想像してみましょう (1 つのリクエストで N 要素を含む)。
func fetchMessages(max: UInt, from: Offset) -> SignalProducer<Message,NoError>
ここで、これを、前のストリームが完了したときにSignalProducer
遅延して呼び出すunbounded に変えたいと思います。fetchMessages
func stream(from: Offset) -> SignalProducer<Message, NoError> {
// challenge is to implement this function
}
機能する可能性はあるが、すべての範囲を事前に計算する必要がある最初のアイデアは、次のコードを一般化することです
func lazyFetchFrom(from: Offset) -> SignalProducer<Message,NoError> {
return SignalProducer<Message,NoError> { (observer, disposable) in
fetchMessages(from).start(observer)
}
}
let lazyStream =
fetchMessages(1000, from)
.concat(lazyFetchFrom(from + 1000))
.concat(lazyFetchFrom(from + 2000))
.... // could probably be done generically using a flatMap
ここで、さらに一歩進んで、前の値が消費されたら、lazyFetchFrom への次の呼び出しを評価したいと思います。それは可能ですか?
ありがとう
PS: 明確にするために、私の主な関心事は、消費者と比較して生産者があまりにも速く生産しないように、ある種の背圧を提供することです
編集:これは、バックプレッシャーを実装するための私の最新の試みです。ただし、信号を観察すると、背圧がなくなり、すべてがメモリにキューイングされます