12

並列抽出をテストするためにこれを作成しました。

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
    {

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName, entry.FullName);

            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }

    }

テスト用の次の単体テスト:

    [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    }

MaxDegreeOfParallelism = 1 では機能しますが、2 では機能しません。

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source:    c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome:   Failed
Test Duration:  0:00:02.4138753

Result Message: 
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: 
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:  
at System.IO.Compression.Inflater.Decode()
   at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
   at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
   at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
   at System.IO.Stream.CopyTo(Stream destination)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
   at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

更新 2

これは私自身が並行して行っていることですが、どちらも機能しません:) continueWithで例外を処理することを忘れないでください。

public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
        {

            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))
            {

                var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                {
                    semaphore.WaitOne();

                    var task = Task.Run(() =>
                    {
                        var path = Path.Combine(folder.FullName, entry.FullName);

                        Directory.CreateDirectory(Path.GetDirectoryName(path));
                        entry.ExtractToFile(path);
                    });
                    task.ContinueWith(handle =>
                    {
                        try
                        {
                            //do any cleanup/post processing
                        }
                        finally
                        {
                            // Release the semaphore so the next thing can be processed
                            semaphore.Release();
                        }
                    });
                }
                while(MaxDegreeOfParallelism-->0)
                    semaphore.WaitOne(); //Wait here until the last task completes.


            }

        }

そして、ここに非同期バージョンがあります:

public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
        {
            return Task.Factory.StartNew(() =>
            {
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))
                {

                    var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                    {
                        semaphore.WaitOne();

                        var task = Task.Run(() =>
                        {
                            var path = Path.Combine(folder.FullName, entry.FullName);

                            Directory.CreateDirectory(Path.GetDirectoryName(path));
                            entry.ExtractToFile(path);
                        });
                        task.ContinueWith(handle =>
                        {
                            try
                            {
                                //do any cleanup/post processing
                            }
                            finally
                            {
                                // Release the semaphore so the next thing can be processed
                                semaphore.Release();
                            }
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
                    }

                }
            });
        }

アップデート 3

次の例外は、handle.Exception でスローされます。

{"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."}

ZipFile がスレッドセーフかどうかを確認する必要があります。

4

5 に答える 5

4

免責事項: 概念実証にすぎません。

コード内のサンプルで ZipFile.OpenRead を ParallelZipFile.OpenRead に置き換えると、4 つのユニットテストすべてに合格します。

   public class ParallelZipFile
    {
        public static ParallelZipArchive OpenRead(string path)
        {

            return new ParallelZipArchive(ZipFile.OpenRead(path),path);
        }
    }
    public class ParallelZipArchive : IDisposable
    {
        internal ZipArchive _archive;
        internal string _path;
        internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();

        public ParallelZipArchive(ZipArchive zip,string path)
        {
            _path = path;
            _archive = zip;
            FreeReaders.Enqueue(zip);
        }

        public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
        {
            get
            {
                var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                int i = 0;
                foreach (var entry in _archive.Entries)
                    list.Add(new ParallelZipArchiveEntry(i++, entry, this));

                return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
            }
        }


        public void Dispose()
        {
            foreach (var archive in FreeReaders)
                archive.Dispose();
        }
    }
    public class ParallelZipArchiveEntry
    {
        private ParallelZipArchive _parent;
        private int _entry;
        public string Name { get; set; }
        public string FullName { get; set; }

        public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
        {
            _entry = entryNr;
            _parent = parent;
            Name = entry.Name;
            FullName = entry.FullName;
        }

        public void ExtractToFile(string path)
        {
            ZipArchive value;
            Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));

            if (!_parent.FreeReaders.TryDequeue(out value))
                value = ZipFile.OpenRead(_parent._path);

            value.Entries.Skip(_entry).First().ExtractToFile(path);



            _parent.FreeReaders.Enqueue(value);
        }
    }

単体テスト

[TestClass]
    public class ZipFileTests
    {
        [ClassInitialize()]
        public static void PreInitialize(TestContext context)
        {
            if (Resources.LocalExtractFolderTruth.Exists)
                Resources.LocalExtractFolderTruth.Delete(true);

            ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
        }

        [TestInitialize()]
        public void InitializeTests()
        {
            if (Resources.LocalExtractFolder.Exists)
                Resources.LocalExtractFolder.Delete(true);

        }

        [TestMethod]
        public void ExtractTest()
        {

            Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));

        }
        [TestMethod]
        public async Task ExtractAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public void ExtractSemaphoreTest()
        {

            Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public async Task ExtractSemaphoreAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }

    }
于 2013-04-26T00:18:49.917 に答える
2

最近同じタスクに取り組みましたが、これが私の結果です。

Xeon 1socket、8cores、16cpu でDotNetZip.reduced (Ionic.Zip.Reduced.dll v1.9.1.8)を使用しています。32GB RAM; SSD ドライブ

zip ファイル | 梱包サイズ | 解凍されたファイル | 開梱サイズ

  • SmallFile1 | 778MB | 4,926 ファイル | 1.4GB
  • 大きいファイル 2 | 6GB | 29,557 ファイル | 10.0GB

私には 5 つのメソッドがあります。最初は 1 つのスレッドですべてを実行し、他の 4 つは PLINQ と TPL Parallel クラスを使用します。

勝者は、V1 よりも 6 倍速く動作するV4 と V5です。以下は詳細な結果とコードです。

  • V1 は ExtractAll を使用します
  • V2 並列でエントリを抽出します (スレッドセーフではありません)
  • V3 エントリごとに新しいファイル ハンドルを開くことにより、エントリを並行して抽出します
  • V4 N+1 ファイル ハンドルのみを使用してエントリを並列に抽出します。
  • V5最終版

業績成績表 zipファイル | V1、秒 | V2、秒 | V3、秒 | V4、秒 | V5、秒

  • SmallFile1 | 32 | 例外 | 8 | 8 | 5
  • 大きいファイル 2 | 200 | 例外 | 2000年 | 35 | 30

小さなファイルの処理 小さなファイルの処理

V1による大容量ファイル処理 V1による大容量ファイル処理

V4による大容量ファイル処理 V4による大容量ファイル処理

V5による大容量ファイル処理 V5による大容量ファイル処理

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Ionic.Zip;
using Ionic.Zlib;

namespace A1
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Console.ReadKey();

            CancellationToken cancellationToken = CancellationToken.None;

            string path = @"e:\1\";
            string zf1 = Path.Combine(path, "1.zip");
            string zf2 = Path.Combine(path, "2.zip");
            Stopwatch sw = new Stopwatch();

            List<string> zipFiles = new List<string>
            {
                zf1,
                zf2,
            };
            List<Action<string, string, CancellationToken>> methods = new List<Action<string, string, CancellationToken>>
            {
                ExtractAllFilesFromZipFileV1,
                //ExtractAllFilesFromZipFileV2,
                //ExtractAllFilesFromZipFileV3,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
            };

            zipFiles.Reverse();
            methods.Reverse();

            zipFiles.ForEach(f => methods.ForEach(m =>
            {
                string fileName = Path.GetFileName(f);
                string targetDirectory = path + Guid.NewGuid().ToString("N");
                sw.Restart();
                // Unzip
                try
                {
                    m(f, targetDirectory, cancellationToken);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
                sw.Stop();
                Console.WriteLine("{0} processed by {1} in {2} seconds", fileName, m.GetMethodInfo().Name, sw.Elapsed.TotalSeconds.ToString("F3"));
                Thread.Sleep(5 * 1000);
                Directory.Delete(targetDirectory, true);
                Thread.Sleep(5 * 1000);
            }));
        }

        private static void ExtractAllFilesFromZipFileV1(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.ExtractAll(targetDirectory);
            }
        }

        private static void ExtractAllFilesFromZipFileV2(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.Entries
                    .AsParallel()
                    .ForAll(v =>
                    {
                        v.Extract(targetDirectory);
                    });
            }
        }


        private static void ExtractAllFilesFromZipFileV3(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                int count = zipFile.Entries.Count;

                Enumerable.Range(0, count)
                    .AsParallel()
                    .ForAll(v =>
                    {
                        cancellationToken.ThrowIfCancellationRequested();

                        using (ZipFile zf = new ZipFile(zipFileName))
                        {
                            // Get the right entry to extract
                            zf.Entries
                                .Skip(v)
                                .First()
                                .Extract(targetDirectory);
                        }
                    });
            }
        }

        private static void ExtractAllFilesFromZipFileV4(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                int count = zipFile.Entries.Count();

                // Cache instances of ZipFile used by threads
                // Make sure that we have only open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, ZipFile> dictionary = new ConcurrentDictionary<int, ZipFile>();

                try
                {
                    Parallel.For(0, count,
                        () =>
                        {
                            // GetOrAdd. Use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                return new ZipFile(zipFileName);
                            });
                        },
                        (int i, ParallelLoopState loopState, ZipFile zf) =>
                        {
                            cancellationToken.ThrowIfCancellationRequested();

                            // Get the right entry to extract
                            ZipEntry entry = zf.Entries
                                .Skip(i)
                                .First();

                            // Extract to a file
                            entry.Extract(targetDirectory);

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (ZipFile zf in dictionary.Values)
                    {
                        zf.Dispose();
                    }
                }
            } // using
        }

        private static void ExtractAllFilesFromZipFileV5(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                ICollection<ZipEntry> zipEntries = zipFile.Entries;
                int count = zipEntries.Where(v => !v.IsDirectory).Count();

                // Caclulate max DOP
                int maxDop = (int)1.5 * Math.Min(count, Environment.ProcessorCount);


                List<Tuple<int, long>> entries = zipEntries
                    .Select((v, i) => Tuple.Create(i, v))
                    .Where(v => !v.Item2.IsDirectory)
                    .Select(v => Tuple.Create(v.Item1, v.Item2.UncompressedSize))
                    .ToList();

                // Load balance between threads
                List<List<Tuple<int, long>>> groupedItems = entries.ToBuckets(maxDop, v => v.Item2 + 10 * 1024 * 1024).ToList();

                // Ensure seq reading from zip file.
                for (int i = 0; i < groupedItems.Count; ++i)
                {
                    groupedItems[i] = groupedItems[i].OrderBy(v => v.Item1).ToList();
                }

                // Cache instances of ZipFile used by threads
                // Make sure that we have open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>> dictionary = new ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>>(maxDop, maxDop);
                ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop, };

                try
                {
                    Parallel.For(0, maxDop, parallelOptions,
                        () =>
                        {
                            // GetOrAdd. Re-use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                ZipFile zf = new ZipFile(zipFileName) { ExtractExistingFile = ExtractExistingFileAction.Throw, FlattenFoldersOnExtract = false, ZipErrorAction = ZipErrorAction.Throw, };
                                zf.ExtractProgress += (sender, e) =>
                                {
                                    cancellationToken.ThrowIfCancellationRequested();
                                };
                                return Tuple.Create(zf, zf.Entries.ToList());
                            });
                        },
                        (int j, ParallelLoopState loopState, Tuple<ZipFile, List<ZipEntry>> zf) =>
                        {

                            List<Tuple<int, long>> list = groupedItems[j];
                            for (int n = 0; n < list.Count; ++n)
                            {
                                cancellationToken.ThrowIfCancellationRequested();

                                int i = list[n].Item1;

                                // Get the right entry to extract
                                ZipEntry entry = zf.Item2[i];
                                Debug.Assert(entry.UncompressedSize == list[n].Item2);

                                // Extract to a file
                                entry.Extract(targetDirectory);
                            }

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (Tuple<ZipFile, List<ZipEntry>> zf in dictionary.Values)
                    {
                        try
                        {
                            zf.Item2.Clear();
                            zf.Item1.Dispose();
                        }
                        catch (ZlibException)
                        {
                            // There is a well known defect in Ionic.ZLib
                            // This exception may happen when you read only part of file (not entire file)
                            // and close its handle.
                            // Ionic.Zlib.ZlibException: Bad CRC32 in GZIP trailer. (actual(D202EF8D)!=expected(A39D1010))
                        }
                    }
                }
            }
        }


        private static IEnumerable<List<T>> ToBuckets<T>(this IEnumerable<T> list, int bucketCount, Func<T, long> getWeight)
        {
            List<T> sortedList = list.OrderByDescending(v => getWeight(v)).ToList();

            List<long> runningTotals = Enumerable.Repeat(0L, bucketCount).ToList();
            List<List<T>> buckets = Enumerable.Range(0, bucketCount)
                .Select(v => new List<T>(sortedList.Count / bucketCount))
                .ToList();

            foreach (T item in sortedList)
            {
                // MinBy runningTotal
                int i = runningTotals.IndexOfMin();
                // Add to bucket
                runningTotals[i] += getWeight(item);
                buckets[i].Add(item);
            }

            return buckets;
        }

        public static int IndexOfMin<T>(this IEnumerable<T> source, IComparer<T> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<T>.Default;
            }

            using (IEnumerator<T> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                T minValue = enumerator.Current;

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    if (comparer.Compare(enumerator.Current, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = enumerator.Current;
                    }
                }

                return minIndex;
            }
        }

        public static int IndexOfMinBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> selector, IComparer<TKey> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<TKey>.Default;
            }

            using (IEnumerator<TSource> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                TKey minValue = selector(enumerator.Current);

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    TKey value = selector(enumerator.Current);
                    if (comparer.Compare(value, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = value;
                    }
                }

                return minIndex;
            }
        }
    }
}
于 2016-05-03T01:55:47.367 に答える
1

問題は、ファイルを 1 つのハンドルだけで 1 回だけ開くことです。1 つのハンドルには 1 つの読み取り位置があり、同じハンドルで並列読み取りを行うと、読み取り位置がめちゃくちゃになります。複数のハンドルでファイルを複数回開いても問題ありません。

于 2013-04-25T19:56:37.640 に答える