0

私のユースケースは、期待されるイベントが X 時間後にリアルタイムで受信されなかったエンティティを特定することです。

例: 時間 T に PaymentInitiated イベントを受信したが、T+X までに PaymentFailed / PaymentAborted / PaymentSucedded のいずれも受信しなかった場合、PaymentInitiated イベントの詳細と共に PaymentStuck というトリガーを発生させます。

一定の時間間隔ではなく、各イベントの期間 X をローリングするため、Apache Samza でそのようなユースケースをモデル化するにはどうすればよいでしょうか。

ありがとう、ハリッシュ

4

1 に答える 1

0

Samza でこれがネイティブにサポートされているかどうかはわかりませんが、WindowableTask を使用する回避策は想像できます。

public class PaymentEvent implements Comparable<PaymentEvent> {
    // if current time > timestamp, payment is stuck
    public long timestamp; 
    // we want a corresponding PaymentFailed... event with the same id
    public long interactionId; 
    // PaymentRequest, PaymentAborted, PaymentSucceeded...
    public enum type;
    ...

    @Override
    public int compareTo(PaymentEvent o){
        return timestamp - o.timestamp;
    }
}

あなたのプロセスメソッドでは、次のようなものになります:

PriorityQueue<PaymentEvent> pqueue;
Map<Long, PaymentEvent> responses;

public void process(...) {
    PaymentEvent e = new PaymentEvent(envelope.getMessage());
    if (e.enum == PAYMENT_REQUEST) {
        pqueue.add(e);
    } else {
        responses.put(e.interactionId, e);
    }
}

そして最後に、ウィンドウ中にすべての優先キューからポップオフtimestamp > current timeし、マップに対応するイベントがあるかどうかを確認します。

public void window(...) {
    while(pqueue.peek().timestamp <= currentTime) {
        if (!map.containsKey(pqueue.poll().interactionId) {
            // send the trigger via the collector
        } 
    }
}

最後に、構成でウィンドウ時間をポーリングしたい時間に設定します。構成はtask.window.ms.

于 2015-10-21T22:16:15.373 に答える