10

期間deltaT中に許可されるイベント数 nを制限する必要があります。私が考えることができる任意のアプローチ、スペースはO(m)です。ここで、mはdeltaTごとに送信されるイベント要求の最大数、またはO(deltaT/r)です。ここで、rは許容される解像度です。

編集: deltaT は、タイムスタンプに相対的なスライド時間ウィンドウです。

例: イベントのタイムスタンプの循環バッファーを保持します。イベント時にt-deltaTより前のすべてのタイムスタンプを切り取ります。タイムスタンプの数がnを超える場合、イベントを拒否します。タイムスタンプをバッファに追加します。

または、解像度rの現在に対する時間でインデックス付けされたサイズdeltaT/rの整数の循環バケット バッファを初期化します。ポインタiを維持します。イベントが発生すると、最後のイベントからの時間iをrで割った値だけ増やします。元のiと新しいiの間のバッファーをゼロにします。iでインクリメントします。盗聴者の合計がnを超える場合は拒否します。

より良い方法は何ですか?


上記の 2 番目の提案を 1 秒の固定デルタ Tと 10 ミリ秒の固定解像度でc# に実装しました。

public class EventCap
{
    private const int RES = 10; //resolution in ms

    private int _max;
    private readonly int[] _tsBuffer;
    private int p = 0;
    private DateTime? _lastEventTime;
    private int _length = 1000 / RES;

    public EventCap(int max)
    {
        _max = max;

        _tsBuffer = new int[_length];
    }

    public EventCap()
    {
    }

    public bool Request(DateTime timeStamp)
    {
        if (_max <= 0)
            return true;

        if (!_lastEventTime.HasValue)
        {
            _lastEventTime = timeStamp;
            _tsBuffer[0] = 1;
            return true;
        }

        //A
        //Mutually redundant with B
        if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
        {
            _lastEventTime = timeStamp;
            Array.Clear(_tsBuffer, 0, _length);
            _tsBuffer[0] = 1;
            p = 0;
            return true;
        }

        var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;

        if (newP < _length)
            Array.Clear(_tsBuffer, p + 1, newP - p);

        else if (newP > p + _length)
        {
            //B
            //Mutually redundant with A
            Array.Clear(_tsBuffer, 0, _length);
        }
        else
        {
            Array.Clear(_tsBuffer, p + 1, _length - p - 1);
            Array.Clear(_tsBuffer, 0, newP % _length);
        }

        p = newP % _length;
        _tsBuffer[p]++;
        _lastEventTime = timeStamp;

        var sum = _tsBuffer.Sum();

        return sum <= 10;
    }
}
4

4 に答える 4

14

これらの変数を持つことについてはどうですか: num_events_allowed、time_before、time_now、time_passed

初期化時に次のことを行います。time_before = system.timer(), num_events_allowed = n

イベントを受信したら、次のことを行います。

  time_now = system.timer()
  time_passed = time_now - time_before
  time_before = time_now

  num_events_allowed += time_passed * (n / deltaT);

  if num_events_allowed > n 
      num_events_allowed = n

  if num_events_allowed >= 1
      let event through, num_events_allowed -= 1
  else
      ignore event

このアルゴリズムの優れている点は、num_events_allowed が、最後のイベントから経過した時間とイベントを受信できるレートによって実際にインクリメントされることです。これにより、time_passed ごとに送信できるイベント数のインクリメントを順番に取得できます。 n の限界にとどまる。したがって、イベントを取得するのが早すぎる場合は 1 未満ずつ増加し、時間がかかりすぎる場合は 1 より多く増加します。もちろん、イベントが通過した場合は、イベントを取得したばかりなので、許容値を 1 減らします。許可が n である最大イベントを通過する場合、任意の時間フェーズで n を超えることを許可できないため、許可を n に戻します。許可が 1 未満の場合、イベント全体を送信することはできません。通過させないでください。

これはリーキー バケット アルゴリズムです: https://en.wikipedia.org/wiki/Leaky_bucket

于 2012-09-21T14:02:24.973 に答える
3

スライディング ウィンドウを保持し、受信リクエストごとに O(1) + 非常に小さい O(n) を保持する 1 つの方法は、適切なサイズの int の配列を作成し、それを循環バッファーとして保持し、受信リクエストを離散化することです (リクエストはA/D コンバーターのようにサンプリングされたレベルと統合されるか、統計学者の場合はヒストグラムとして統合されます)、このように循環バッファーの合計を追跡します。

assumptions: 
"there can be no more than 1000 request per minute" and 
"we discretize on every second"

int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)

for each request:
    check if request-integrator < 1000 then
         // the following incs could be placed outside 
         // the if statement for saturation on to many
         // requests (punishment)
         request-integrator++                     // important
         discretizer-integrator++
         proceed with request

once every second:                    // in a transactional memory transaction, for God's saké 
    buffer-index++
    if (buffer-index = 60) then buffer-index=0    // for that circular buffer feeling!
    request-integrator -= buffer[buffer-index]    // clean for things happening one minute ago
    buffer[buffer-index] = discretizer-integrator // save this times value
    discretizer-integrator = 0                    // resetting for next sampling period

リクエスト インテグレータの増加は 1 秒に 1 回だけ "可能" であることに注意してください。ただし、1 秒間に 1000 件以上のリクエストで飽和させる穴が開いたままになることに注意してください。

于 2012-09-26T22:37:39.227 に答える
2

問題に対するさまざまな可能な解決策について読みながら。トークン バケット アルゴリズム ( http://en.wikipedia.org/wiki/Token_bucket ) に出会いました。私があなたの質問を完全に理解していれば、実際に N 個のトークンを持つバケットを持たずにトークン バケット アルゴリズムを実装できます。代わりに、それに応じて増減できるカウンターを使用します。お気に入り

syncronized def get_token = 
    if count>0 
       { --count, return true }
    else return false

syncronized def add_token = 
    if count==N
       return;
    else ++count

残りの部分は、add_token を deltaT/r 時間で繰り返し呼び出すことです。

完全にスレッドセーフにするには、カウントするアトミック参照が必要です。しかし、上記のコードは、O(1) メモリでそれを行う基本的な考え方を示すためのものです。

于 2012-09-21T14:19:56.463 に答える
1

関数呼び出しの頻度を制限するために、以下のクラス (ActionQueue) を作成しました。良い点の 1 つは、タイマーを使用してキューから項目をポップすることです...そのため、CPU は最小限に使用されます (キューが空の場合はまったく使用されません)... ポーリング タイプの手法とは対照的です.

例...

    // limit to two call every five seconds
    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(5), 2);
    public void Test()
    {
        for (var i = 0; i < 10; i++)
        {
            _actionQueue.Enqueue((i2) =>
            {
                Console.WriteLineAction " + i2 + ": " + DateTime.UtcNow);
            }, i);
        }
    }

実世界の例...

    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(1), 10);

    public override void SendOrderCancelRequest(Order order, SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var parms = (Tuple<Order, SessionID>)state;
            base.SendOrderCancelRequest(parms.Item1, parms.Item2);
        }, new Tuple<Order, SessionID>(order, sessionID));
    }
    public override void SendOrderMassStatusRequest(SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var sessionID2 = (SessionID)state;
            base.SendOrderMassStatusRequest(sessionID2);
        }, sessionID);
    }

実際のクラス...

public class ActionQueue
{
    private class ActionState
    {
        public Action<object> Action;
        public object State;
        public ActionState(Action<object> action, object state)
        {
            Action = action;
            State = state;
        }
    }
    Queue<ActionState> _actions = new Queue<ActionState>();
    Queue<DateTime> _times = new Queue<DateTime>();

    TimeSpan _timeSpan;
    int _maxActions;
    public ActionQueue(TimeSpan timeSpan, int maxActions)
    {
        _timeSpan = timeSpan;
        _maxActions = maxActions;           
    }
    public void Enqueue(Action<object> action, object state)
    {
        lock (_times)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);

            if (_times.Count <= _maxActions)
                action(state);
            else
                _actions.Enqueue(new ActionState(action, state));

            CreateDequeueTimerIfNeeded();
        }
    }

    System.Threading.Timer _dequeueTimer;
    protected void CreateDequeueTimerIfNeeded()
    {
        // if we have no timer and we do have times, create a timer
        if (_dequeueTimer == null && _times.Count > 0) 
        {
            var timeSpan = _times.Peek() - DateTime.UtcNow;
            if (timeSpan.TotalSeconds <= 0)
            {
                HandleTimesQueueChange();
            }
            else
            {
                _dequeueTimer = new System.Threading.Timer((obj) =>
                {
                    lock (_times)
                    {
                        _dequeueTimer = null;
                        HandleTimesQueueChange();
                    }
                }, null, timeSpan, System.Threading.Timeout.InfiniteTimeSpan);
            }
        }
    }

    private void HandleTimesQueueChange()
    {
        _times.Dequeue();
        while (_times.Count > 0 && _times.Peek() < DateTime.UtcNow)
            _times.Dequeue();

        while (_actions.Count > 0 && _times.Count < _maxActions)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);
            var actionState = _actions.Dequeue();
            actionState.Action(actionState.State);
        }

        CreateDequeueTimerIfNeeded();
    }
}
于 2016-12-28T22:50:03.553 に答える