6

私は完全に管理された Mercurial ライブラリ (完全に管理されたWindows 用の Mercurial Server で使用される予定で、近日公開予定) を作成していますが、私が直面している最も深刻なパフォーマンスの問題の 1 つは、奇妙なことに、配列を部分的に分割することです。

アイデアは次のとおりです。サイズが数百バイトから最大1メガバイトのバイト配列があり、それを使用する必要があるのは、私の特定のケースでは文字で区切られた部分に分割することだけです\n

dotTrace が私に示しているのは、私の「最適化された」バージョン(Splitコード正しいです。これが私が始めた単純なバージョンです) は、2,300 回の呼び出しで 11 秒かかります (dotTrace 自体によって導入された明らかなパフォーマンス ヒットがありますが、すべてが規模)。

数字は次のとおりです。

  • unsafeバージョン:呼び出し11 297のミリ秒2 312
  • マネージド (「ナイーブ」) バージョン:呼び出し20 001のミリ秒2 312

C# で配列を分割する最速の (できれば移植可能で、x86 と x64 の両方をサポートする) 方法は次のとおりです。

4

3 に答える 3

4

問題は、ループで多くの複雑な操作を行っていることだと思います。このコードは、ループ内の単一の加算と比較を除くすべての操作を削除します。その他の複雑なことは、分割が検出されたとき、または配列の最後でのみ発生します。

また、テストを実行するデータの種類を特定するのは難しいため、これは当て推量にすぎません。

public static unsafe Segment[] Split2(byte[] _src, byte value)
{
    var _ln = _src.Length;

    if (_ln == 0) return new Segment[] { };

    fixed (byte* src = _src)
    {
        var segments = new LinkedList<Segment>(); // Segment[c];

        byte* last = src;
        byte* end = src + _ln - 1;
        byte lastValue = *end;
        *end = value; // value-termination

        var cur = src;
        while (true)
        {
            if (*cur == value)
            {
                int begin = (int) (last - src);
                int length = (int) (cur - last + 1);
                segments.AddLast(new Segment(_src, begin, length));

                last = cur + 1;

                if (cur == end)
                {
                    if (lastValue != value)
                    {
                        *end = lastValue;
                    }
                    break;
                }
            }
            cur++;
        }

        return segments.ToArray();
    }
}

編集:コードを修正したため、正しい結果が返されます。

于 2012-08-26T20:39:43.210 に答える
3

Split の場合、32 ビット マシンでの ulong の処理は非常に遅いため、必ず uint に減らしてください。本当に ulong が必要な場合は、32 ビット用と 64 ビット用の 2 つのバージョンを実装してください。

一度にバイトを処理する方が速いかどうかも測定する必要があります。

メモリ割り当てのコストをプロファイリングする必要があります。十分に大きい場合は、複数の呼び出しでメモリを再利用してみてください。

他の:

ToString: "(" + Offset.ToString() + ", " + Length.ToString() + ")" を使用する方が高速です。

GetHashCode: fixed(byte * b = & buffer[offset]) を試す


このバージョンは、複数回使用すると非常に高速になるはずです。重要なポイント: 内部配列が適切なサイズに拡張された後、新しいメモリ割り当てはなく、最小限のデータ コピーです。

class ArraySplitter
{
    private byte[] m_data;
    private int    m_count;
    private int[]  m_stops;

    private void AddRange(int start, int stop)
    {
        // Skip empty range
        if (start > stop)
        {
            return;
        }

        // Grow array if needed
        if ((m_stops == null) || (m_stops.Length < (m_count + 2)))
        {
            int[] old = m_stops;

            m_stops = new int[m_count * 3 / 2 + 4];

            if (old != null)
            {
                old.CopyTo(m_stops, 0);
            }
        }

        m_stops[m_count++] = start;
        m_stops[m_count++] = stop;
    }

    public int Split(byte[] data, byte sep)
    {
        m_data  = data;
        m_count = 0;      // reuse m_stops

        int last = 0;

        for (int i = 0; i < data.Length; i ++)
        {
            if (data[i] == sep)
            {
                AddRange(last, i - 1);
                last = i + 1;
            }
        }

        AddRange(last, data.Length - 1);

        return m_count / 2;
    }

    public ArraySegment<byte> this[int index]
    {
        get
        {
            index *= 2;
            int start = m_stops[index];

            return new ArraySegment<byte>(m_data, start, m_stops[index + 1] - start + 1);
        }
    }
}

テストプログラム:

    static void Main(string[] args)
    {
        int count = 1000 * 1000;

        byte[] data = new byte[count];

        for (int i = 0; i < count; i++)
        {
            data[i] = (byte) i;
        }

        Stopwatch watch = new Stopwatch();

        for (int r = 0; r < 10; r++)
        {
            watch.Reset();
            watch.Start();

            int len = 0;

            foreach (var seg in data.MySplit(13))
            {
                len += seg.Count;
            }

            watch.Stop();

            Console.WriteLine("MySplit      : {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);

            watch.Reset();
            watch.Start();

            ArraySplitter splitter = new ArraySplitter();

            int parts = splitter.Split(data, 13);

            len = 0;

            for (int i = 0; i < parts; i++)
            {
                len += splitter[i].Count;
            }

            watch.Stop();
            Console.WriteLine("ArraySplitter: {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);
        }
    }

結果:

MySplit      : 996093    9.514 ms
ArraySplitter: 996093    4.754 ms
MySplit      : 996093    7.760 ms
ArraySplitter: 996093    2.710 ms
MySplit      : 996093    8.391 ms
ArraySplitter: 996093    3.510 ms
MySplit      : 996093    9.677 ms
ArraySplitter: 996093    3.468 ms
MySplit      : 996093    9.685 ms
ArraySplitter: 996093    3.370 ms
MySplit      : 996093    9.700 ms
ArraySplitter: 996093    3.425 ms
MySplit      : 996093    9.669 ms
ArraySplitter: 996093    3.519 ms
MySplit      : 996093    9.844 ms
ArraySplitter: 996093    3.416 ms
MySplit      : 996093    9.721 ms
ArraySplitter: 996093    3.685 ms
MySplit      : 996093    9.703 ms
ArraySplitter: 996093    3.470 ms
于 2012-08-26T19:58:23.203 に答える
2

アントン、

このスレッドはかなり古いため、これを最適化することにまだ興味があるかどうかはわかりませんが、オンライン リポジトリでコードがほとんど同じであることがわかったので、試してみようと思いました。HgLab アプリケーションを評価する際に、bitbucket.org で HgSharp コードを調べました。大幅に簡素化されたネイティブ構成を使用して関数を書き直しました。私のテストでは、元のルーチンよりも半分以上の時間が改善されました。数メガバイトのソースファイルをロードしてテストし、元のルーチンを使用して同じ操作のパフォーマンスとタイミングを比較しました。

基本的なロジックを書き直すことに加えてArraySegment<>、カスタム実装の代わりにフレームワークに組み込まれているネイティブを使用することにしました。唯一の大きな違いは、プロパティではなくプロパティをArraySegment<>公開することです。以下のコードは、ポインターを使用していないため、unsafe キーワードを必要としませんが、そうするように変更すると、パフォーマンスがわずかに向上するようです。CountLength


    public static ArraySegment<byte>[] SplitEx(this byte[] source, byte value) {
        var _previousIndex = -1;
        var _segments = new List<ArraySegment<byte>>();
        var _length = source.Length;

        if (_length > 0) {
            int _index;

            for (_index = 0; _index < _length; _index++) {
                var _value = source[_index];
                if (_value == value) {
                    _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
                    _previousIndex = _index;
                }
            }

            if (--_index != _previousIndex) {
                _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
            }
        }

        return _segments.ToArray();
    }
于 2012-12-18T01:01:04.000 に答える