13

問題: .NET SDK を使用して AWS S3 から 100 個のファイルを並行してダウンロードしたいと考えています。ダウンロードしたコンテンツは 100 個のメモリ ストリームに保存する必要があります (ファイルは十分小さいので、そこから取得できます)。Task、IAsyncResult、Parallel.*、および .NET 4.0 の他のさまざまなアプローチの間で混乱しています。

自分で問題を解決しようとすると、頭のてっぺんから次の擬似コードのようなものを想像します: (いくつかの変数に型を追加するために編集されています)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});

このコードは、100 個のリクエストを並行して起動します。これは良いことです。ただし、次の 2 つの問題があります。

  1. 最後のステートメントは、ファイルを並列ではなくシリアルにダウンロードします。ストリームに BeginCopyTo()/EndCopyTo() メソッドがないようです...
  2. 前のステートメントは、すべての要求が応答するまで解放されません。つまり、すべてのファイルが開始されるまで、どのファイルもダウンロードを開始しません。

ここで、私は間違った道を進んでいると思い始めます...

ヘルプ?

4

1 に答える 1

22

操作を1つのリクエストを非同期で処理するメソッドに分割し、それを100回呼び出すと、おそらく簡単です。

まず、必要な最終結果を特定しましょう。作業するのはそれなので、メソッドからMemoryStreamを返したいということです。Task<MemoryStream>署名は次のようになります。

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)

AmazonS3オブジェクトは非同期デザインパターンを実装しているため、クラスのFromAsyncメソッドを使用して、次のように非同期デザインパターンを実装するクラスTaskFactoryからを生成できます。Task<T>

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?

だから、あなたはすでに良い場所にいます、あなたはTask<T>あなたが待つことができるか、呼び出しが完了したときにコールバックを得ることができるものを持っています。GetObjectResponseただし、呼び出しから返されたをにTask<GetObjectResponse>変換する必要がありますMemoryStream

そのためには、クラスのContinueWithメソッドを使用する必要があります。これは、クラスのメソッドTask<T>の非同期バージョンと考えてください。これは、呼び出すたびに、コードのそのセクションを実行する新しいタスクを作成する可能性があることを除いて、別のメソッドへの単なる投影です。SelectEnumerableTask<T>ContinueWith

これで、メソッドは次のようになります。

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}

上記では、最小限の作業を行っているように見えるため、合格の過負荷とContinueWithTaskContinuationOptions.ExecuteSynchronously呼ぶことができることに注意してください(私にはわかりませんが、応答が非常に大きい可能性があります)。作業を完了するために新しいタスクを開始することが有害となる非常に最小限の作業を行っている場合はTaskContinuationOptions.ExecuteSynchronously、最小限の操作で新しいタスクを作成する時間を無駄にしないように合格する必要があります。

1つのリクエストをに変換できるメソッドができたので、任意のTask<MemoryStream>数のリクエストを処理するラッパーを作成するのは簡単です。

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}

上記では、GetObjectRequestインスタンスのシーケンスを取得するだけで、の配列が返されますTask<MemoryStream>。マテリアライズされたシーケンスを返すという事実は重要です。戻る前にそれを具体化しない場合、シーケンスが繰り返されるまでタスクは作成されません。

もちろん、この動作が必要な場合は、必ず、への呼び出しを削除し.ToArray()、メソッドを返すIEnumerable<Task<MemoryStream>>ようにしてください。そうすれば、タスクを繰り返すときにリクエストが作成されます。

そこから、一度に1つずつ処理するか(ループ内のTask.WaitAnyメソッドを使用)、すべてが完了するのを待つ(Task.WaitAllメソッドを呼び出す)ことができます。後者の例は次のとおりです。

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}

また、これは実装に非常に適しているため、これはReactiveExtensionsフレームワークに非常に適していることに注意してくださいIObservable<T>

于 2012-05-07T19:19:03.690 に答える