5

Web サービス リクエストをオンライン API に送信する必要があり、Parallel Extensions が私のニーズに適していると考えました。

問題の Web サービスは繰り返し呼び出されるように設計されていますが、1 秒間に一定の呼び出し回数を超えると料金が発生する仕組みになっています。私は明らかに料金を最小限に抑えたいと考えているため、次の要件に対応できる TaskScheduler を見た人がいるかどうか疑問に思っていました。

  1. タイムスパンごとにスケジュールされるタスクの数を制限します。リクエストの数がこの制限を超えた場合、タスクを破棄するか、ブロックする必要があると思いますか? (タスクのバックログを停止するため)
  2. 同じリクエストがスケジューラに既に実行されていて、まだ実行されていないかどうかを検出し、そうである場合は 2 番目のタスクをキューに入れず、代わりに最初のタスクを返します。

これらはタスク スケジューラが対処すべき責任の種類であると人々は感じているのでしょうか、それとも私が間違ったツリーを吠えているのでしょうか? 代替案がある場合は、提案を受け付けています。

4

4 に答える 4

8

私は、TPLデータフローがこれに対する良い解決策のように聞こえることに同意します。

処理を制限するために、TransformBlock実際にはデータを変換しないを作成できます。前のデータの直後に到着した場合は、データを遅延させるだけです。

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

次に、データ(たとえば、0から始まる整数)を生成するメソッドを作成します。

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

非同期で書き込まれるため、ターゲットブロックが現在アイテムを処理できない場合は、待機します。

次に、コンシューマーメソッドを記述します。

static void Consumer(int i)
{
    Console.WriteLine(i);
}

そして最後に、それをすべてリンクして起動します。

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

ここでdelayBlockは、500ミリ秒ごとに最大1つのアイテムを受け入れ、Consumer()メソッドは複数回並行して実行できます。処理を終了するには、を呼び出しますdelayBlock.Complete()

#2ごとにキャッシュを追加する場合は、別のキャッシュを作成TransformBlockしてそこで作業を行い、他のブロックにリンクすることができます。

于 2012-03-21T17:06:34.417 に答える
3

正直なところ、私はより高いレベルの抽象化で作業し、これには TPL Dataflow API を使用します。唯一の問題は、デフォルトではブロックが「貪欲」であり、可能な限り高速に処理されるため、必要な速度でリクエストを調整するカスタム ブロックを作成する必要があることです。実装は次のようになります。

  1. 投稿先の論理ブロックである a から始めBufferBlock<T>ます。
  2. BufferBlock<T>を、1 秒あたりのリクエスト数とスロットリング ロジックの知識を持つカスタム ブロックにリンクします。
  3. カスタム ブロックを 2 から にリンクしますActionBlock<T>

#2 のカスタム ブロックを今すぐ書く時間はありませんが、後で確認して、まだ理解していない場合は実装を埋めようとします。

于 2012-03-20T22:31:14.690 に答える
2

RXはあまり使用していませんが、AFAICTのObservable.Windowメソッドで問題なく動作します。

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window(VS.103).aspx

要素を捨てるように見えるスロットルよりも適しているように思われますが、これはあなたが望むものではないと私は推測しています

于 2012-03-21T03:41:20.983 に答える
0

時間で調整する必要がある場合は、Quartz.netを確認してください。これにより、一貫したポーリングが容易になります。すべてのリクエストに関心がある場合は、何らかのキューイング メカニズムの使用を検討する必要があります。MSMQ はおそらく適切なソリューションですが、より大きくしてNServiceBusRabbitMQなどの ESB を使用したい場合は、多くの特定の実装があります。

アップデート:

その場合、CTP を利用できる場合は、TPL Dataflow が推奨されるソリューションです。調整された BufferBlock が解決策です。

この例は、Microsoft が提供するドキュメントからのものです。

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await m_buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await m_buffer.ReceiveAsync());
    }
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

アップデート:

RX のObservable.Throttleを確認してください。

于 2012-03-20T22:03:30.410 に答える