並列抽出をテストするためにこれを作成しました。
public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
{
ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
using (var archive = ZipFile.OpenRead(file.FullName))
{
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
block.Post(entry);
}
block.Complete();
await block.Completion;
}
}
テスト用の次の単体テスト:
[TestMethod]
public async Task ExtractTestAsync()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
// Resources.LocalExtractFolder.Create();
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
}
MaxDegreeOfParallelism = 1 では機能しますが、2 では機能しません。
Test Name: ExtractTestAsync
Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome: Failed
Test Duration: 0:00:02.4138753
Result Message:
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception:
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:
at System.IO.Compression.Inflater.Decode()
at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
at System.IO.Stream.CopyTo(Stream destination)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
更新 2
これは私自身が並行して行っていることですが、どちらも機能しません:) continueWithで例外を処理することを忘れないでください。
public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
{
int MaxDegreeOfParallelism = 2;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
});
}
while(MaxDegreeOfParallelism-->0)
semaphore.WaitOne(); //Wait here until the last task completes.
}
}
そして、ここに非同期バージョンがあります:
public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
{
return Task.Factory.StartNew(() =>
{
int MaxDegreeOfParallelism = 50;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
},TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
}
}
});
}
アップデート 3
次の例外は、handle.Exception でスローされます。
{"Block length does not match with its complement."}
[0] = {"A local file header is corrupt."}
ZipFile がスレッドセーフかどうかを確認する必要があります。