1

私は C# のスレッド化と同時実行コレクションを学び始めたばかりで、質問を投げかける適切な用語がわからないので、私がやろうとしていることを簡単に説明します。現時点では、この件に関する私の把握はせいぜい初歩的なものです。以下の私のアプローチは、私が想像したように実現可能ですか?

  1. テストが必要な Concurrent コレクションに 100,000 個の URL がありますが、リンクは正常ですか? 最初は空の別の同時コレクションがあり、非同期リクエストによって移動されたと判断された URL のサブセット (400、404 などのエラー) が含まれます。

  2. PC と帯域幅が許す限り、これらの非同期リクエストを同時に生成したいと考えており、1 秒あたり 20 個の async-web-request-tasks から始めて、そこから先に進んでいくつもりでした。

1 つの非同期タスクが両方を処理する場合、それは機能しますか? 4xx エラーが発生した場合、非同期要求を作成し、BadUrls コレクションに URL を追加しますか? そのタスクの新しいインスタンスが 50 ミリ秒ごとに生成されます。

     class TestArgs args {
        ConcurrentBag<UrlInfo> myCollection  { get; set; }
        System.Uri currentUrl  { get; set; }
     }

      ConcurrentQueue<UrlInfo> Urls = new ConncurrentQueue<UrlInfo>();
        // populate the Urls queue
        <snip>

     // initialize the bad urls collection  
      ConcurrentBag<UrlInfo> BadUrls = new ConcurrentBag<UrlInfo>();


      // timer fires every 50ms, whereupon a new args object is created
      //  and the timer callback spawns a new task; an autoEvent would
      // reset the timer and dispose of it when the queue was empty


       void SpawnNewUrlTask(){
           // if queue is empty then reset the timer
           // otherwise:
           TestArgs args = {            
               myCollection = BadUrls,              
                currentUrl = getNextUrl()  // take an item from the queue
           };
           Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);
       }



       public async Task asyncWebRequestAndConcurrentCollectionUpdater(TestArgs args) 
       {
           //make the async web request 
           // add the url to the bad collection if appropriate.  
       } 

実現可能?途中?

4

2 に答える 2

3

アプローチはうまくいくようですが、あなたが示した特定のコードにはいくつかの問題があります。

しかし、私がそれに到達する前に、タスクの並列処理が進むべき道であるという提案がコメントにありました。それは見当違いだと思います。多くの作業を並行して実行したい場合、必然的に多くのスレッドが必要になるという一般的な誤解があります。これは、作業がコンピューティング バウンドである場合にのみ当てはまります。しかし、実行している作業は IO バウンドになります。このコードは、応答を待つ時間の大部分を費やします。それはほとんど計算をしません。したがって、実際には、1 つのスレッドしか使用していなかったとしても、最初の目標である 1 秒あたり 20 リクエストは、1 つの CPU コアに負担をかけるほどのワークロードには見えません。

つまり、1 つのスレッドで非常に高いレベルの同時 IO を処理できます。コードの並列実行が必要な場合にのみ複数のスレッドが必要ですが、この特定のジョブでは CPU の作業がほとんどないため、ここではそうではないようです。

(この誤解はawait、何年も前から存在していますasync。実際、それは TPL よりも前から存在しています。少数のスレッドで数千の同時要求を処理できます. Windows ネットワーク IO は基本的に同じように機能するため、基本的な原則は今日でも適用されます.)

ここで複数のスレッドを使用することに特に問題があるというわけではありませんが、それが少し気を散らすものであることを指摘しているだけです。

とにかく、コードに戻ります。この行には問題があります:

Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);

すべてのコードを提供していただいたわけではありませんが、どのようにコンパイルできるかわかりません。StartNew2 つの引数を受け入れるのオーバーロードでは、最初の引数が an Action、 an Action<object>、 a Func<TResult>、または aのいずれかである必要がありFunc<object,TResult>ます。つまり、引数を取らないか、型の単一の引数を受け入れるobject(値を返す場合と返さない場合がある) メソッドである必要があります。あなたの「asyncWebRequestAndConcurrentCollectionUpdater」は type の引数を取りますTestArgs

しかし、コンパイルできないという事実は、主な問題ではありません。それは簡単に修正できます。(たとえば、 に変更しますTask.Factory.StartNew(() => asyncWebRequestAndConcurrentCollectionUpdater(args));) 本当の問題は、あなたがしていることは少し奇妙です: を使用Task.StartNewして、既に . を返すメソッドを呼び出していますTask

Task.StartNewは、同期メソッド (つまり、 a を返さないTaskメソッド) を受け取り、それをノンブロッキングで実行する便利な方法です。(スレッドプールで実行されます。)しかし、すでに を返すメソッドがあるTask場合は、実際に を使用する必要はありませんでしたTask.StartNew。何が返されるかを見ると、奇妙なことがより明らかになりTask.StartNewます (コンパイルエラーを修正した後):

Task<Task> t = Task.Factory.StartNew(
    () => asyncWebRequestAndConcurrentCollectionUpdater(args));

それTask<Task>は何が起こっているかを明らかにします。非非同期メソッドを非同期にするために通常使用されるメカニズムを使用して、既に非同期になっているメソッドをラップすることにしました。これで、Taskを生成する が得られましたTask

これの少し驚くべき結果の 1 つは、 によって返されたタスクがStartNew完了するのを待っていた場合、基になる作業が必ずしも完了していないということです。

t.Wait(); // doesn't wait for asyncWebRequestAndConcurrentCollectionUpdater to finish!

実際に行うことは、asyncWebRequestAndConcurrentCollectionUpdaterが返されるのを待つことだけですTask。そして、asyncWebRequestAndConcurrentCollectionUpdaterすでに非同期メソッドであるため、多かれ少なかれすぐにタスクを返します。(具体的には、awaitすぐには完了しないタスクを実行した瞬間にタスクを返します。)

開始した作業が完了するまで待ちたい場合は、次のようにする必要があります。

t.Result.Wait();

または、潜在的により効率的に、これ:

t.Unwrap().Wait();

Taskつまり、非同期メソッドが返した を取得してから、それを待ちます。これは、次のはるかに単純なコードとあまり変わらないかもしれません。

Task t = asyncWebRequestAndConcurrentCollectionUpdater("foo");
... maybe queue up some other tasks ...
t.Wait();

`Task.Factory.StartNew' を導入しても、何も役に立たなかったかもしれません。

私が「可能性がある」と言ったのは、重要な条件があるからです。それは、作業を開始する状況によって異なります。C# は、デフォルトで、asyncメソッドが の後に継続する場合に、が最初に実行さawaitれたのと同じコンテキストで継続することを保証しようとするコードを生成します。awaitたとえば、WPF アプリを使用awaitしていて UI スレッドを使用している場合、コードが続行されると、UI スレッドで実行するように調整されます。(これは で無効にできますConfigureAwait。)

したがって、コンテキストが本質的にシリアル化されている状況にある場合 (GUI アプリの場合のようにシングルスレッドであるため、または特定の ASP のコンテキストなど、レンタル モデルに似たものを使用するため) .NET リクエスト) を介して非同期タスクを開始するTask.Factory.StartNewと、元のコンテキストをエスケープできるため、実際に役立つ場合があります。ただし、あなたの人生をより困難にしただけです。タスクを完了まで追跡することは、やや複雑です。ConfigureAwaitまた、メソッド内で を使用するだけで、同じ効果を得ることができたかもしれませんasync

とにかく問題ではないかもしれません - 1 秒間に 20 リクエストしか処理しようとしていない場合、それを行うために必要な最小限の CPU 負荷は、おそらく 1 つのスレッドで完全に適切に処理できることを意味します。(また、これがコンソール アプリの場合、スレッド プールを使用する既定のコンテキストが有効になるため、どのような場合でもタスクをマルチスレッドで実行できます。)

しかし、あなたの質問に戻ると、単一のものを持つことは私にとって完全に合理的ですasyncキューから URL を選択し、要求を作成し、応答を調べ、必要に応じて不正な URL コレクションにエントリを追加するメソッド。また、タイマーから物事を開始することも合理的です。これにより、遅い応答で行き詰まることなく、接続が試行される速度が抑制されます (たとえば、大量の要求がオフラインのサーバーと通信しようとする場合)。何万もの URL が連続して応答しないサーバーを指しているという病理学的なケースに遭遇した場合は、処理中のリクエストの最大数に上限を設ける必要があるかもしれません。(関連する注意事項として、使用している HTTP API に関係なく、クライアントごとの接続制限に達しないことを確認する必要があります。これにより、有効なスループットが抑制される可能性があります。)

なんらかの完了処理を追加する必要があります。非同期操作を開始するだけで、結果を処理するために何もしないのは悪い習慣です。行き場のない例外が発生する可能性があるからです。(.NET 4.0 では、これらはプロセスを終了するために使用されていましたが、.NET 4.5 の時点では、非同期操作からの未処理の例外はデフォルトで単純に無視されます! Task.Factory.StartNew)ラッピングの余分な層になってしまったのでmyTask.Unwrap().ContinueWith(...)、正しく処理するために何かをする必要があります.

于 2013-11-29T10:42:06.467 に答える
0

もちろんできます。並行コレクションは、複数のスレッドで同時に使用できるため、「並行」と呼ばれ、その動作についていくつかの保証があります。

ConcurrentQueue は、挿入された各要素が 1 回だけ抽出されるようにします (同時スレッドが誤って同じアイテムを抽出することはありません。キューが空になると、すべてのアイテムがスレッドによって抽出されます)。

編集: 失敗する可能性のある唯一のことは、50 ミリ秒では要求を完了するのに十分ではないため、ますます多くのタスクがタスク キューに蓄積されることです。その場合、メモリがいっぱいになる可能性がありますが、とにかく機能します。はい、それは実現可能です。

とにかく、タスクはスレッドではないという事実を強調したいと思います。100 個のタスクを作成したとしても、実際に同時に実行されるタスクの数はフレームワークによって決定されます。

並列処理のレベルをより細かく制御したい場合は、非同期リクエストを使用する必要があります。あなたのコメントでは「async web request」と書きましたが、別のスレッドにあるという理由だけで async を書いたのか、async API を使用するつもりで書いたのかわかりません。非同期 API を使用している場合は、完了イベントに何らかのハンドラーが関連付けられていることを期待しますが、それを確認できなかったため、非同期タスクから発行された同期要求を使用していると想定しました。非同期リクエストを使用している場合、タスクを使用しても意味がありません。タイマーを使用して非同期リクエストを発行するだけです。これらはすでに非同期になっているためです。

「非同期リクエスト」と言うときは、WebRequest.GetResponseAsync や WebRequest.BeginGetResponse などのメソッドを指しています。

EDIT2: 非同期リクエストを使用する場合は、タイマー ハンドラからリクエストを作成できます。このBeginGetResponseメソッドは 2 つの引数を取ります。1 つ目はコールバック プロシージャで、リクエストのステータスを報告するために呼び出されます。すべてのリクエストに対して同じ手順を渡すことができます。2 つ目はユーザー提供のオブジェクトで、リクエストに関するステータスを保存します。この引数を使用して、さまざまなリクエストを区別できます。タイマーなしでもできます。何かのようなもの:

private readonly int desiredConcurrency = 20;

struct RequestData
{
  public UrlInfo url;
  public HttpWebRequest request;
}

/// Handles the completion of an asynchronous request
/// When a request has been completed,
/// tries to issue a new request to another url.
private void AsyncRequestHandler(IAsyncResult ar)
{
  if (ar.IsCompleted)
  {
    RequestData data = (RequestData)ar.AsyncState;
    HttpWebResponse resp = data.request.EndGetResponse(ar);
    if (resp.StatusCode != 200)
    {
      BadUrls.Add(data.url);
    }

    //A request has been completed, try to start a new one
    TryIssueRequest();
  }
}

/// If urls is not empty, dequeues a url from it
/// and issues a new request to the extracted url.
private bool TryIssueRequest()
{
  RequestData rd;
  if (urls.TryDequeue(out rd.url))
  {
    rd.request = CreateRequestTo(rd.url); //TODO implement
    rd.request.BeginGetResponse(AsyncRequestHandler, rd);
    return true;
  }
  else
  {
    return false;
  }
}

//Called by a button handler, or something like that
void StartTheRequests()
{
  for (int requestCount = 0; requestCount < desiredConcurrency; ++requestCount)
  {
    if (!TryIssueRequest()) break;
  }
}
于 2013-08-05T14:27:31.613 に答える