0

私は現在 IRC ボットを書いています。過剰なフラッドを避けたいので、X ミリ秒ごとに次のメッセージを送信するメッセージ キューを作成することにしましたが、失敗しました。43行目:

unset.Add((string)de.Key);

OutOfMemory 例外をスローします。私は何が間違っているのか全くわかりません。

おそらく、そのような (おそらく複雑な) キューイング方法の背後にある一般的な考え方についても説明する必要があります。

まず、メッセージのターゲットがキーとして機能するメインHashtable queuehtストアタイプです。ConcurrentQueue<string>ボットがハッシュテーブルを反復処理し、各キューから 1 つのメッセージを送信するようにします (キューが空になった場合はキーを削除します)。ハッシュテーブル自体を処理する適切な方法が思いつかなかったのでConcurrentQueue<string> queue、キューを空にするときにキーとその使用順序を格納する別のキュー を作成することにしました。

キューに数百のアイテムがあるという仮説的な状況を想定すると (これは可能かもしれません)、新しいリクエストはどれくらいの時間 (メッセージ間の組み込みの遅延と遅延) を知っているかによって遅延されるため、メソッド Add() を再構築しqueueます。のディープ コピーを作成しqueueht(そう願っています)、queueこの使い捨てコピーに基づいて新しいコピーを生成し、その過程でそれを取り除きます。

私はスレッド化、単純な配列よりも複雑なコレクション、および OOP の習慣/慣習をまったく経験していないため、私の一連の思考やコードはひどく間違っていると思います。説明で私の問題を解決していただければ幸いです。前もって感謝します!

編集: クラス全体を投稿します。

class SendQueue
{
    Hashtable queueht;
    ConcurrentQueue<string> queue;
    Timer tim;
    IRCBot host;
    public SendQueue(IRCBot host)
    {
        this.host = host;
        this.tim = new Timer();
        this.tim.Elapsed += new ElapsedEventHandler(this.SendNewMsg);
        this.queueht = new Hashtable();
        this.queue = new ConcurrentQueue<string>();
    }
    public void Add(string target, string msg)
    {
        try
        {
            this.queueht.Add(target, new ConcurrentQueue<string>());
        }
        finally
        {
            ((ConcurrentQueue<string>)this.queueht[target]).Enqueue(msg);
        }
        Hashtable ht = new Hashtable(queueht);
        List<string> unset = new List<string>();
        while (ht.Count > 0)
        {
            foreach (DictionaryEntry de in ht)
            {
                ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
                string res;
                if (cq.TryDequeue(out res))
                    this.queue.Enqueue((string)de.Key);
                else
                    unset.Add((string)de.Key);
            }
        }
        if (unset.Count > 0)
            foreach (string item in unset)
                ht.Remove(item);
    }
    private void SendNewMsg(object sender, ElapsedEventArgs e)
    {
        string target;
        if (queue.TryDequeue(out target))
        {
            string message;
            if (((ConcurrentQueue<string>)queueht[target]).TryDequeue(out message))
                this.host.Say(target, message);
        }
    }
}

EDIT2:while (ht.Count > 0)無期限に実行されることを認識しています。それは、次のように見えた以前のバージョンからの残りの部分です。

while (ht.Count > 0)
{
    foreach (DictionaryEntry de in ht)
    {
        ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
        string res;
        if (cq.TryDequeue(out res))
            this.queue.Enqueue((string)de.Key);
        else
            ht.Remove((string)de.Key);
    }
}

しかし、評価時にコレクションを変更することはできません (そして、私はそれが難しい方法であることがわかりました)。の条件を変更するのを忘れていましたwhile

私は自由にTheThingのソリューションを試しました。目的を果たしているように見えますが、メッセージは送信されません...最終的な形式は次のとおりです。

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }
    public User(string username, string message)
        : this(username)
    {
        this.RequestQueue.Enqueue(message);
    }
    public string Username { get; set; }
    public Queue<string> RequestQueue { get; private set; }
}
class SendQueue
{
    Timer tim;
    IRCBot host;
    public bool shouldRun = false;
    public Dictionary<string, User> Users;  //Dictionary of users currently being processed
    public ConcurrentQueue<User> UserQueue; //List of order for which users should be processed
    public SendQueue(IRCBot launcher)
    {
        this.Users = new Dictionary<string, User>();
        this.UserQueue = new ConcurrentQueue<User>();
        this.tim = new Timer(WorkerTick, null, Timeout.Infinite, 450);
        this.host = launcher;
    }
    public void Add(string username, string request)
    {
        lock (this.UserQueue) //For threadsafety
        {
            if (this.Users.ContainsKey(username))
            {
                //The user is in the user list. That means he has previously sent request that are awaiting to be processed.
                //As such, we can safely add his new message at the end of HIS request list.

                this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
                return;
            }
            //User is not in the user list. Means it's his first request. Create him in the user list and add his message
            var user = new User(username, request);
            this.Users.Add(username, user); //Create the user and his message
            this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
        }
    }
    public void WorkerTick(object sender)
    {
        if (shouldRun)
        {
            //This tick runs every 400ms and processes next message to be sent.
            lock (this.UserQueue) //For threadsafety
            {
                User user;
                if (this.UserQueue.TryDequeue(out user))            //Pop the next user to be processed.
                {
                    string message = user.RequestQueue.Dequeue();   //Pop his request
                    this.host.Say(user.Username, message);
                    if (user.RequestQueue.Count > 0)                //If user has more messages waiting to be processed
                    {
                        this.UserQueue.Enqueue(user);               //Add him at the end of the userqueue
                    }
                    else
                    {
                        this.Users.Remove(user.Username);           //User has no more messages, we can safely remove him from the user list
                    }
                }
            }
        }
    }
}

に切り替えてみましたがConcurrentQueue、これも同様に機能するはずです(ただし、よりスレッドセーフな方法で、スレッドセーフについて何も知らないというわけではありません)。に切り替えてみましたSystem.Threading.Timerが、それも役に立ちません。私はずっと前にアイデアを使い果たしました。

編集: 完全でまったくのばかであるため、タイマーの開始時間を設定しませんでした。bool 部分を、タイマーの dueTime と interval を変更する Start() メソッドに変更すると、機能するようになりました。問題が解決しました。

4

3 に答える 3

0

これを試して:

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }

    private static readonly TimeSpan _minPostThreshold = new TimeSpan(0,0,5); //five seconds

    public void PostMessage(string message)
    {
        var lastMsgTime = _lastMessageTime;
        _lastMessageTime = DateTime.Now;
        if (lastMsgTime != default(DateTime))
        {
            if ((_lastMessageTime - lastMsgTime) < _minPostThreshold)
            {
                return;
            }
        }

        _requestQueue.Enqueue(message);     
    }

    public string NextMessage
    {
        get
        {
            if (!HasMessages)
            {
                return null;
            }

            return _requestQueue.Dequeue();
        }
    }

    public bool HasMessages
    {
        get{return _requestQueue.Count > 0;}
    }

    public string Username { get; set; }
    private Queue<string> _requestQueue { get; private set; }
    private DateTime _lastMessageTime;
}

class SendQueue
{
    Timer tim;
    IRCBot host;
    public bool shouldRun = false;
    public Dictionary<string, User> Users;  //Dictionary of users currently being processed
    private Queue<User> _postQueue = new Queue<User>();

    public SendQueue(IRCBot launcher)
    {
        this.Users = new Dictionary<string, User>();
        this.tim = new Timer(WorkerTick, null, Timeout.Infinite, 450);
        this.host = launcher;
    }

    public void Add(string username, string request)
    {
        User targetUser;
        lock (Users) //For threadsafety
        {
            if (!Users.TryGetValue(username, out targetUser))
            {
                //User is not in the user list. Means it's his first request. Create him in the user list and add his message
                targetUser = new User(username);
                Users.Add(username, targetUser); //Create the user and his message
            }

            targetUser.PostMessage(request);
        }

        lock(_postQueue)
        {
            _postQueue.Enqueue(targetUser);
        }
    }

    public void WorkerTick(object sender)
    {
        if (shouldRun)
        {
            User nextUser = null;

            lock(_postQueue)
            {
                if (_postQueue.Count > 0)
                {
                    nextUser = _PostQueue.Dequeue();
                }
            }

            if (nextUser != null)
            {                
                host.Say(nextUser.Username, nextUser.NextMessage);
            }
        }
    }
}

更新: 要件をよりよく理解した後に変更されました。

これにより、ユーザーごとのフラッド制御と全体的なスロットリングの両方が提供されます。また、はるかに簡単です。

これはオンザフライで書かれており、コンパイルさえされていないことに注意してください。おそらく User インスタンスの周りにスレッドの問題がいくつかあるので、考慮する必要がありますが、うまくいくはずです。

于 2012-08-03T19:04:29.033 に答える
0

私が見る限りwhile、一時ハッシュテーブルからアイテムを削除することは決してないので、そこから逃げることはありませんht。したがって、カウントは常に になります> 0

于 2012-07-30T12:30:23.990 に答える
0

私が最もよく理解していることから、ユーザーを順番にキューに入れ、それぞれのリクエストをキューに入れたいと考えています。

つまり、1000 リクエストのように 1 人のユーザーがリクエストした場合でも、他のユーザーは自分のリクエストを送信でき、ボットは各ユーザーから 1 つのリクエストを FIFO 方式で処理します。

もしそうなら、あなたが必要とするのは、この機能に似た方法です:

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }

    public User(string username, string message)
        : this(username)
    {
        this.RequestQueue.Enqueue(message);
    }

    public string Username { get; set; }
    public Queue<string> RequestQueue { get; private set; }
}


///......................

public class MyClass
{
    public MyClass()
    {
        this.Users = new Dictionary<string, User>();
        this.UserQueue = new Queue<User>();
    }

    public Dictionary<string, User> Users; //Dictionary of users currently being processed
    public Queue<User> UserQueue; //List of order for which users should be processed

    public void OnMessageRecievedFromIrcChannel(string username, string request)
    {
        lock (this.UserQueue) //For threadsafety
        {
            if (this.Users.ContainsKey(username))
            {
                //The user is in the user list. That means he has previously sent request that are awaiting to be processed.
                //As such, we can safely add his new message at the end of HIS request list.

                this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
                return;
            }

            //User is not in the user list. Means it's his first request. Create him in the user list and add his message
            var user = new User(username, request);
            this.Users.Add(username, user); //Create the user and his message
            this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
        }
    }

    //**********************************

    public void WorkerTick()
    {
        //This tick runs every 400ms and processes next message to be sent.
        lock (this.UserQueue) //For threadsafety
        {
            var user = this.UserQueue.Dequeue(); //Pop the next user to be processed.
            var message = user.RequestQueue.Dequeue(); //Pop his request

            /////PROCESSING MESSAGE GOES HERE

            if (user.RequestQueue.Count > 0) //If user has more messages waiting to be processed
            {
                this.UserQueue.Enqueue(user); //Add him at the end of the userqueue
            }
            else
            {
                this.Users.Remove(user.Username); //User has no more messages, we can safely remove him from the user list
            }
        }
    }
}

基本的に、ユーザーのキューがあります。次のユーザーをポップし、最初のリクエストを処理し、処理待ちのリクエストがさらにある場合は、ユーザー リストの最後に追加します。

これにより、いくつかの機能がクリアされることを願っています。記録として、上記のコードは機能コードというより疑似コードです xD

于 2012-07-30T12:09:15.917 に答える