3

私はWebサイトのマルチスレッドスクレーパーに取り組んでおり、別の質問に従って、QueueUserWorkItem()でThreadPoolを使用することにしました。

一度にすべてをキューに入れずに、作業項目を継続的にキューに入れるにはどうすればよいですか?300kを超えるアイテム(userIDごとに1つ)をキューに入れる必要があり、それらをすべてキューに入れるためにループすると、メモリが不足します。

だから、私が欲しいのは:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

この間、スレッドが使用可能になると、webScraperは300000個のworkItemをすべて継続的に追加します。

これが私がこれまでに持っているものです:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

これは正しいアプローチですか?

Start()が呼び出された後、バックグラウンドですべての作業項目を一度に作成するのではなく、作業項目の作成を処理するための正しい方向を誰かに教えてもらえますか?

4

5 に答える 5

2

これは、ワーク キューからワークを盗むワーク アイテムを少なくして実装したほうがよいでしょうか? 300,000 個の仕事があるからといって、それを行うために 300,000 人の労働者が必要だというわけではありません。少数のコアしかないことは明らかなので、これらの作業の一部しか並行して実行できないため、作業のチャンクをはるかに少ないワーカーに分配しないのはなぜでしょうか?

各作業にかかる時間がどれだけ一定であるかに応じて、すべてを各ワーカーに均等に分割するか、中央のキュー (ロックする必要があります) を用意して、各ワーカーが不足したときにいくつかの作業を取得できます。 .

編集:

Joe Duffy は、ここに Work Stealing Queue の作成に関するシリーズを持っているようです: http://www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx。また、.Net 4 の Threadpool が少し賢くなるようです。しかし、このシナリオには特に複雑なものは必要ないと思います。

于 2009-09-10T09:15:20.010 に答える
0

別のスレッドプールを使用できます。これが1つです:http://www.codeplex.com/smartthreadpool これにより、すべてのアイテムを一度にキューに入れることができます。作成するスレッドの最大数を割り当てることができます。1000個の作業項目があり、100個のスレッドを割り当てたとします。それはすぐに最初の100アイテムを取り、残りが待つ間それらを動かします。これらのアイテムの1つが完了し、スレッドが解放されるとすぐに、次のキューに入れられたアイテムが開始されます。すべての作業を管理しますが、スレッドとメモリを飽和させることはありません。また、.netスレッドプールのスレッドは使用しません。

于 2009-10-28T19:54:39.980 に答える
0

起動しているワーカーの数を管理し、キューをいっぱいに保つマスタープロセス制御クラスが必要なようです。

次に、2つのキューを操作できます。

  1. あなたがこする必要があるすべてのアイテムを保持するための1つ
  2. 仕事をする2番目

このマスター/ガバナーオブジェクトは、キュー#1のすべてのアイテムがなくなるまでループを維持し、使用可能なサイクルがある場合はキュー#2に追加し続けます。

于 2009-10-27T20:15:23.233 に答える
0

キューに入れられたアイテムのキューを作成するのはどうも適切ではないように思われるので、WorkItems 自体が終了した後に再度キューを作成するのはどうですか?

Start メソッドは、たとえば MaxThreads アイテムの 3 倍 (この例では 75) をキューに入れることができ、それが完了すると Process メソッド自体がキューに入れられます。そうすれば、Start メソッドはすぐに戻りますが、いくつかの作業項目を開始します。


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

安全のために、Interlocked を使用して Running フラグを設定する必要があります。乗数をコンストラクターに渡すことができるプロパティにしました。これらの統計の解析にかかる時間に応じて、パフォーマンスを微調整するために調整できると確信しています。

于 2009-10-26T17:02:02.223 に答える