Samza でこれがネイティブにサポートされているかどうかはわかりませんが、WindowableTask を使用する回避策は想像できます。
public class PaymentEvent implements Comparable<PaymentEvent> {
// if current time > timestamp, payment is stuck
public long timestamp;
// we want a corresponding PaymentFailed... event with the same id
public long interactionId;
// PaymentRequest, PaymentAborted, PaymentSucceeded...
public enum type;
...
@Override
public int compareTo(PaymentEvent o){
return timestamp - o.timestamp;
}
}
あなたのプロセスメソッドでは、次のようなものになります:
PriorityQueue<PaymentEvent> pqueue;
Map<Long, PaymentEvent> responses;
public void process(...) {
PaymentEvent e = new PaymentEvent(envelope.getMessage());
if (e.enum == PAYMENT_REQUEST) {
pqueue.add(e);
} else {
responses.put(e.interactionId, e);
}
}
そして最後に、ウィンドウ中にすべての優先キューからポップオフtimestamp > current time
し、マップに対応するイベントがあるかどうかを確認します。
public void window(...) {
while(pqueue.peek().timestamp <= currentTime) {
if (!map.containsKey(pqueue.poll().interactionId) {
// send the trigger via the collector
}
}
}
最後に、構成でウィンドウ時間をポーリングしたい時間に設定します。構成はtask.window.ms
.