2

各行が Azure Table Storage とは異なるパーティションに存在する 60 行の取得を並列化する最速の方法は何ですか?

次のオプションを試しましたが、非同期呼び出し (1 番から 4 番) はすべて、60 行を取得するのに 2 秒から 3 秒かかりました。

  1. 非同期 Query.Execute で複数のタスクを使用する
  2. Interlocked.Increment および非同期 Query.Execute で 1 つの ManualResetEvent を使用する
  3. 非同期 Query.Execute での ManualResetEvents のリストの使用
  4. ManualResetEvents のリストと非同期 Query.Execute を使用する Parallel.ForEach ループ
  5. ブロッキング Query.Execute を使用した Parallel.ForEach

参考までに、Azure SDK 1.7 と .Net 4 を使用しています。

各オプションに使用しているサンプル コード:

  1. 非同期 Query.Execute で複数のタスクを使用する

    public List<string> Get100RowsUsingTaskWaitAllWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var tasks = new Queue<Task>();
    
        foreach (var eachId in _rowKeysToGet.Take(60))
        {
            var cloudStorageAccount =
                CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
            var ctx = new LogServiceContext(
                cloudStorageAccount.TableEndpoint.AbsoluteUri,
                cloudStorageAccount.Credentials);
    
            var query = (from e in ctx.LogEntries where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            tasks.Enqueue(
                Task<ResultSegment<LogEntry>>.Factory.FromAsync(
                    query.BeginExecuteSegmented,
                    query.EndExecuteSegmented,
                    TaskCreationOptions.None)
                .ContinueWith(t => {
                    var r = t.Result.Results.FirstOrDefault();
                    result.Add(r);
                }));
        }
    
        Task.WaitAll(tasks.ToArray());
    
        var output = result.Select(r => r.RowKey).ToList();
    
        return output;
    }
    
  2. Interlocked.Increment および非同期 Query.Execute で 1 つの ManualResetEvent を使用する

    public List<string> Get100RowsUsingOneManualResetEventWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var threadCount = 0;
        using (var finished = new ManualResetEvent(false))
        {
            foreach (var eachId in _rowKeysToGet.Take(60))
            {
                Interlocked.Increment(ref threadCount);
    
                var query = (from e in ctx.LogEntries
                             where e.RowKey == eachId
                                   && e.PartitionKey == _partitionKey
                             select e).AsTableServiceQuery<LogEntry>();
    
                var startTimeInLoop = DateTime.UtcNow;
    
                query.BeginExecuteSegmented((ar) =>
                {
                    var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                        .EndExecuteSegmented(ar);
    
                    var eachLogEntry = response.Results.FirstOrDefault();
    
                    result.Add(eachLogEntry);
    
                    if (Interlocked.Decrement(ref threadCount) == 0)
                    {
                        finished.Set();
                    }
    
                }, query);
            }
    
            finished.WaitOne();
        }
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  3. 非同期 Query.Execute での ManualResetEvents のリストの使用

    public List<string> Get100RowsUsingListOfManualResetEventsWithAsyncExecute()    
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var events = new List<ManualResetEvent>();
    
        foreach (var eachId in _rowKeysToGet.Take(60))
        {
            var query = (from e in ctx.LogEntries
                         where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            var evt = new ManualResetEvent(false);
    
            LogEntry eachLogEntry = null;
            query.BeginExecuteSegmented((ar) => {
                var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                    .EndExecuteSegmented(ar);
    
                eachLogEntry = response.Results.FirstOrDefault();
    
                result.Add(eachLogEntry);
    
                evt.Set();
    
            }, query);
    
            events.Add(evt);
        };
    
        WaitHandle.WaitAll(events.ToArray());
    
        events.ForEach(e => e.Dispose());
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  4. ManualResetEvents のリストと非同期 Query.Execute を使用する Parallel.ForEach ループ

    public List<string> Get100RowsUsingParallelForEachAndManualResetEventWithAsyncExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        var events = new List<ManualResetEvent>();
    
        Parallel.ForEach(_rowKeysToGet.Take(60), eachId =>
        {
            var query = (from e in ctx.LogEntries
                          where e.RowKey == eachId
                              && e.PartitionKey == _partitionKey
                          select e).AsTableServiceQuery<LogEntry>();
    
            var evt = new ManualResetEvent(false);
    
            LogEntry eachLogEntry = null;
            query.BeginExecuteSegmented((ar) =>
            {
                var response = (ar.AsyncState as CloudTableQuery<LogEntry>)
                    .EndExecuteSegmented(ar);
    
                eachLogEntry = response.Results.FirstOrDefault();
    
                result.Add(eachLogEntry);
    
                evt.Set();
    
            }, query);
    
            events.Add(evt);
        });
    
        WaitHandle.WaitAll(events.ToArray());
    
        events.ForEach(e => e.Dispose());
    
        return result.Select(r => r.RowKey).ToList();
    }
    
  5. ブロッキング Query.Execute を使用した Parallel.ForEach

    public List<string> Get100RowsUsingParallelForEachWithBlockingExecute()
    {
        var result = new ConcurrentBag<LogEntry>();
    
        var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]);
        var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials);
    
        Parallel.ForEach(_rowKeysToGet.Take(60), eachId =>
        {
            var query = (from e in ctx.LogEntries
                         where e.RowKey == eachId
                             && e.PartitionKey == _partitionKey
                         select e).AsTableServiceQuery<LogEntry>();
    
            var eachLogEntry = query.Execute().FirstOrDefault();
    
            result.Add(eachLogEntry);
        });
    
        return result.Select(r => r.RowKey).ToList();
    }
    
4

0 に答える 0