1

直接 REST API または提供する WCF サービスのいずれかによって入力される 4 つの Azure キューがあります。

  1. これら 4 つのキューすべてを監視する 1 つのワーカー ロールが必要です。
  2. configからキュー名などを読み込んでprocessメソッド(キューからメッセージを読み込んで処理を行う)をスピンするマルチスレッドを使おうと考えています。

ワーカーの役割でこれを達成する方法について、誰かが例またはガイダンスを提供してくれませんか?

私はマルチスレッドが初めてなので、マルチスレッドなしで上記を達成できるかどうかはよくわかりません。

ありがとうございました

4

3 に答える 3

2

さまざまなタスクに対してさまざまなスレッドを起動できますが、スレッド化されていないアプローチも検討してください (メッセージをどう処理するかによってパフォーマンスが向上または低下する可能性があります)。

while (true)
{
    var msg = queue1.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue1.DeleteMessage(msg);
    }
    msg = queue2.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue2.DeleteMessage(msg);
    }
    // ...
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}
于 2011-08-28T00:47:40.030 に答える
1

これは、あなたが要求していることを正確に実行するための現在の実装ですが、より良い方法で (またはそう考えています)。とは言っても、このコードにはまだかなりのクリーンアップが必要です。ただし、これはこれの機能バージョン 0.1 です。

public class WorkerRole : RoleEntryPoint
{
    public override void Run()
    {
        var logic = new WorkerAgent();
        logic.Go(false);
    }

    public override bool OnStart()
    {
        // Initialize our Cloud Storage Configuration.
        AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);

        return base.OnStart();
    }
}

public class WorkerAgent
{
    private const int _resistance_to_scaling_larger_queues = 9;
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                       {
                                                           {typeof (Queue1.Processor), 1},
                                                           {typeof (Queue2.Processor), 1},
                                                           {typeof (Queue3.Processor), 1},
                                                           {typeof (Queue4.Processor), 1},
                                                       };

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
    protected TimeSpan CurrentDelay { get; set; }

    public Func<string> GetSpecificQueueTypeToProcess { get; set; }

    /// <summary>
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
    /// </summary>
    public Dictionary<Type, int> QueueWeights
    {
        get
        {
            return _queueWeights;
        }
        set
        {
            _queueWeights = value;
        }
    }

    public static TimeSpan QueueWeightCalibrationDelay
    {
        get { return TimeSpan.FromMinutes(15); }
    }


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
    {
        CurrentDelay = _minDelay;
        GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
    }

    protected IProcessQueues CurrentProcessor { get; set; }

    /// <summary>
    /// Processes queue request(s).
    /// </summary>
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
    public void Go(bool onlyProcessOnce)
    {
        if (onlyProcessOnce)
        {
            ProcessOnce(false);
        }
        else
        {
            ProcessContinuously();
        }
    }

    public void ProcessContinuously()
    {
        while (true)
        {
            // temporary hack to get this started.
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// Attempts to fetch and process a single queued request.
    /// </summary>
    public void ProcessOnce(bool shouldDelay)
    {
        PopulateQueueMetaData(QueueWeightCalibrationDelay);

        if (shouldDelay)
        {
            Thread.Sleep(CurrentDelay);
        }

        var typesToPickFrom = new List<Type>();
        foreach(var item in QueueWeights)
        {
            for (var i = 0; i < item.Value; i++)
            {
                typesToPickFrom.Add(item.Key);
            }
        }

        var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
        var typeToTryAndProcess = typesToPickFrom[randomIndex];

        CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
        CleanQueueDelays();

        if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
        {
            var errors = CurrentProcessor.Go();

            var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                               ? _maxDelay // the queue was empty
                               : _minDelay; // else

            QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
        }
        else
        {
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// This method populates/refreshes the QueueMetaData collection.
    /// </summary>
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
    {
        if (QueueMetaData == null)
        {
            QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
        }

        var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
        var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
        var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
        var results = new Dictionary<Type, AzureQueueMetaData>();

        foreach (var queueProcessorType in queuesWithoutMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);

                    QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                  ? 1
                                                  : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                }
            }
        }

        foreach (var queueProcessorType in expiredQueueMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);
                }
            }
        }

        QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
    }

    private void CleanQueueDelays()
    {
        QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
    }
}

これにより、各キューの処理方法を認識し、IProcessQueues を実装する別のクラスができました。_queueWeights処理したいタイプごとにコレクションをロードします。_resistance_to_scaling_larger_queuesこれをどのようにスケーリングするかを制御する定数を設定します。これは対数的にスケーリングされることに注意してください (PopulateQueueMetaDataメソッドを参照)。アイテムが 0 であっても、重みが 1 未満のキューはありません。に設定PopulateQueueMetaDataした場合10、次に、大きさが 10 ずつ増加するたびに、そのタイプの「重み」が 1 ずつ増加します。たとえば、アイテムが 0 の QueueA、アイテムが 0 の QueueB、アイテムが 10 の QueueC がある場合、それぞれの重みは次のようになります。 1、1、および 2。これは、QueueC が次に処理される可能性が 50% であるのに対し、QueueA と QueueB はそれぞれ 25% の可能性しか処理されないことを意味します。QueueC に 100 個のアイテムがある場合、重みは 1、1、3 であり、処理される可能性は 20%、20%、60% です。これにより、空のキューが忘れられなくなります。

これが行うもう 1 つのことは、 と があること_minDelayです_maxDelay。このコードが、キューに少なくとも 1 つのアイテムがあると判断した場合、その_minDelay速度でできるだけ速く処理を続けます。_maxDelayただし、最後にアイテムが 0 だった場合、レートよりも速く処理することはできません。したがって、これは、乱数ジェネレーターが (重みに関係なく) アイテムが 0 のキューをプルアップした場合、その処理をスキップして次の反復に進むことを意味します。(ストレージ トランザクションの効率を向上させるために、この部分にいくつかの追加の最適化を加えることができますが、これはちょっとした追加です。)

ここにはいくつかのカスタム クラスがあります ( と などAzureQueue) AzureQueueMetaData- 1 つは基本的に のラッパーでCloudQueueあり、もう 1 つは Queue のおおよその数などの情報を格納します - そこには興味深いものはありません (コードを単純化する方法にすぎません)。 .

繰り返しますが、私はこのコードを「きれいな」コードとは呼びませんが、いくつかのかなり巧妙な概念がこのコードに実装され、機能しています。お好きな理由でご利用ください。:)

最後に、このコードをこのように書くと、1 つのプロジェクトでさらに多くのキューを処理できるようになります。これが単に追いついていないことがわかった場合は、より多くのインスタンスに簡単にスケールでき、すべてのキューにスケールアップできます。最小限のシナリオでは、これの 1 つのインスタンスをデプロイして 3 つのキューを監視できます。ただし、4 番目のキューがパフォーマンスに影響を与え始めた場合 (またはより高い可用性が必要な場合) は、これを最大 2 つのインスタンスに増やします。15 キューに達したら、3 分の 1 を追加します。25 キュー、4 番目のインスタンスを追加します。新しい顧客を獲得し、システム全体で多数のキュー リクエストを処理する必要がありますが、それで問題ありません。この 1 つのロールを最大 20 のインスタンスまでスピンして、完了するまでスピンダウンします。特に厄介なキューがありますか?キューから外れたコメント_queueWeightsコレクション、展開して残りのキューを管理し、コレクションからコメントアウトされたこのキューを除く他のすべてのキュー_queueWeightsで再度展開し、別のインスタンスのセットに再度展開してデバッグを行います。デバッグに干渉し、b) デバッグが他の QueueProcessor に干渉します。最終的に、これにより多くの柔軟性と効率が得られます。

于 2011-09-19T20:44:51.853 に答える
0

worker ロールの while ループ内で、マルチスレッド C# アプリケーションを記述しているかのように、4 つのスレッドを開始します。もちろん、4 つの異なるスレッド関数を定義する必要があり、それらの関数には、キューをポーリングするための個別の while ループが必要です。ワーカーの while ループの最後で、スレッドが終了するのを待ちます。

于 2011-08-28T05:40:40.720 に答える