実際に必要なのは、ある種のピリオド バッファーです。これは、一定期間の値をバッファーに保持し、バッファーが容量に達したときにのみ送信を開始します (以下のコードは、takeLast オペレーターにインスパイアされています)。
extension SignalType {
func periodBuffer(period:Int) -> Signal<[Value], Error> {
return Signal { observer in
var buffer: [Value] = []
buffer.reserveCapacity(period)
return self.observe { event in
switch event {
case let .Next(value):
// To avoid exceeding the reserved capacity of the buffer, we remove then add.
// Remove elements until we have room to add one more.
while (buffer.count + 1) > period {
buffer.removeAtIndex(0)
}
buffer.append(value)
if buffer.count == period {
observer.sendNext(buffer)
}
case let .Failed(error):
observer.sendFailed(error)
case .Completed:
observer.sendCompleted()
case .Interrupted:
observer.sendInterrupted()
}
}
}
}
}
それに基づいて、必要なアルゴリズムにマップできます
let pipe = Signal<Int,NoError>.pipe()
pipe.0
.periodBuffer(3)
.map { Double($0.reduce(0, combine: +))/Double($0.count) } // simple moving average
.observeNext { print($0) }
pipe.1.sendNext(10) // does nothing
pipe.1.sendNext(11) // does nothing
pipe.1.sendNext(15) // prints 12
pipe.1.sendNext(7) // prints 11
pipe.1.sendNext(9) // prints 10.3333
pipe.1.sendNext(6) // prints 7.3333