各行が Azure Table Storage とは異なるパーティションに存在する 60 行の取得を並列化する最速の方法は何ですか?
次のオプションを試しましたが、非同期呼び出し (1 番から 4 番) はすべて、60 行を取得するのに 2 秒から 3 秒かかりました。
- 非同期 Query.Execute で複数のタスクを使用する
- Interlocked.Increment および非同期 Query.Execute で 1 つの ManualResetEvent を使用する
- 非同期 Query.Execute での ManualResetEvents のリストの使用
- ManualResetEvents のリストと非同期 Query.Execute を使用する Parallel.ForEach ループ
- ブロッキング Query.Execute を使用した Parallel.ForEach
参考までに、Azure SDK 1.7 と .Net 4 を使用しています。
各オプションに使用しているサンプル コード:
非同期 Query.Execute で複数のタスクを使用する
public List<string> Get100RowsUsingTaskWaitAllWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var tasks = new Queue<Task>(); foreach (var eachId in _rowKeysToGet.Take(60)) { var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext( cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); tasks.Enqueue( Task<ResultSegment<LogEntry>>.Factory.FromAsync( query.BeginExecuteSegmented, query.EndExecuteSegmented, TaskCreationOptions.None) .ContinueWith(t => { var r = t.Result.Results.FirstOrDefault(); result.Add(r); })); } Task.WaitAll(tasks.ToArray()); var output = result.Select(r => r.RowKey).ToList(); return output; }
Interlocked.Increment および非同期 Query.Execute で 1 つの ManualResetEvent を使用する
public List<string> Get100RowsUsingOneManualResetEventWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var threadCount = 0; using (var finished = new ManualResetEvent(false)) { foreach (var eachId in _rowKeysToGet.Take(60)) { Interlocked.Increment(ref threadCount); var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var startTimeInLoop = DateTime.UtcNow; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); var eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } }, query); } finished.WaitOne(); } return result.Select(r => r.RowKey).ToList(); }
非同期 Query.Execute での ManualResetEvents のリストの使用
public List<string> Get100RowsUsingListOfManualResetEventsWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var events = new List<ManualResetEvent>(); foreach (var eachId in _rowKeysToGet.Take(60)) { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var evt = new ManualResetEvent(false); LogEntry eachLogEntry = null; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); evt.Set(); }, query); events.Add(evt); }; WaitHandle.WaitAll(events.ToArray()); events.ForEach(e => e.Dispose()); return result.Select(r => r.RowKey).ToList(); }
ManualResetEvents のリストと非同期 Query.Execute を使用する Parallel.ForEach ループ
public List<string> Get100RowsUsingParallelForEachAndManualResetEventWithAsyncExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); var events = new List<ManualResetEvent>(); Parallel.ForEach(_rowKeysToGet.Take(60), eachId => { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var evt = new ManualResetEvent(false); LogEntry eachLogEntry = null; query.BeginExecuteSegmented((ar) => { var response = (ar.AsyncState as CloudTableQuery<LogEntry>) .EndExecuteSegmented(ar); eachLogEntry = response.Results.FirstOrDefault(); result.Add(eachLogEntry); evt.Set(); }, query); events.Add(evt); }); WaitHandle.WaitAll(events.ToArray()); events.ForEach(e => e.Dispose()); return result.Select(r => r.RowKey).ToList(); }
ブロッキング Query.Execute を使用した Parallel.ForEach
public List<string> Get100RowsUsingParallelForEachWithBlockingExecute() { var result = new ConcurrentBag<LogEntry>(); var cloudStorageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["LoggingConnectionString"]); var ctx = new LogServiceContext(cloudStorageAccount.TableEndpoint.AbsoluteUri, cloudStorageAccount.Credentials); Parallel.ForEach(_rowKeysToGet.Take(60), eachId => { var query = (from e in ctx.LogEntries where e.RowKey == eachId && e.PartitionKey == _partitionKey select e).AsTableServiceQuery<LogEntry>(); var eachLogEntry = query.Execute().FirstOrDefault(); result.Add(eachLogEntry); }); return result.Select(r => r.RowKey).ToList(); }