4

問題

C# を使用して非常に大きなデータ構造を保存および読み取ることができる必要があります。構造自体はかなり単純です。これは、一定サイズの単純な構造体の非常に長い配列です。

わかりやすくするための例:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

これらを可能な限り高速かつ効率的にファイルに保存およびロードできるようにしたいと考えています。

一般的な方向性

これまでの私の考えは、データをセグメントに分割し、概念的にはもちろん、それらのセグメントをタスクに割り当て、非同期でファイルに書き込むことです。FileStream.WriteAsync はこれに最適なようです。

私の問題は読み取りに関するものです。FileStream.ReadAsync API から、実際にはプリミティブの途中で、結果が各構造の途中でカットされる可能性があることは完全に合理的です。もちろん、これを回避することはできますが、何が最善の方法なのか、OS のバッファリング メカニズムにどの程度干渉するのかはわかりません。

最終的には、各バッファから MemoryStream を作成し、MemoryStream.MemoryStream(byte[])それぞれをバイナリ リーダーで構造体に読み込む予定です。

質問

では、これを解決する最善の方法は何でしょうか? 私の方向は良いですか?より良い解決策はありますか?コード例とリンクをいただければ幸いです...

結論

パフォーマンス テストを行った後、BinaryReader でファイルを読み取るか、FileStream.ReadAsync で複数のリーダーを使用すると、ほぼ同じパフォーマンスが得られることがわかりました。

スー……その質問は無意味です。

4

3 に答える 3

3

最大のボトルネックは、ファイルへの排他的アクセスで実行する必要がある IO になります。このための実際のバイトクランチは高速です-ファイルに直接書き込むのと同じくらいうまくいきます(FileStreamそれ自体にバッファがあることに注意してください、または で余分なレイヤーを追加できますBufferedStream)、さまざまな部分をシリアル化するよりもメモリ内の各部分を個別にストリームにコピーします。

私のアドバイス: データを 1 つのスレッドに書き込むだけです。率直に言って、特にバッファが追いついている場合は、async(ヒント:非同期コードがオーバーヘッドを追加します)気にするかどうかさえわかりません。BiaryWriter/も使用しませんBinaryReader- 生で書くだけです。あなたができる1つのトリッキーな方法は、いくつかのunsafeコードを使用してデータをブロックにコピーし、個々のオブジェクトを見る必要さえなくすことですが、それは物事の難しいところです...私は例をやってみます.

以下は、読み取り/書き込みの例です。最初にパフォーマンスに注意してください。

Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

コード:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}
于 2013-04-26T09:34:24.540 に答える
3

[編集] この投稿を更新して、コンパイル可能な完全なサンプルを含め、以下のコメントで @Daniel が提起した問題に対処しました。その結果、このコードは「危険な」メソッドを使用しなくなり、コード分析の警告も表示されなくなりました。[/編集]

構造体にblittable 型のみが含まれている場合、少し高速化する方法があります。

次のように、マーシャリングを使用して、追加のコピーを作成せずにデータを配列に直接読み込むことができます (完全なコンパイル可能な例)。

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

次にBlockingCollection、受信データを格納する を作成し、1 つのスレッドを使用してデータを入力し、別のスレッドを使用してそれを消費することができます。

データをキューに読み込むスレッドは次のようになります。

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

そして、消費スレッドは次のようにキューからものを削除します:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}
于 2013-04-26T09:42:01.670 に答える
1

私の知る限り、ファイルを読み書きする最速の方法は、単一の転送のみのプロセスです。そうしないと、必須の読み取り/書き込みに加えて、ディスクをファイル内で前後に移動する必要があります。

もちろん、これは複数の同時スレッドでデータを処理できないという意味ではありません。

セグメントが十分に大きい場合、ディスク移動のオーバーヘッドはおそらく気にならないでしょう。

于 2013-04-26T09:34:48.057 に答える