11

マルチスレッド CPU で 80 バイトのデータを毎秒 5k から 10k レコードのレートで受信する効率的な C# アプリケーションがあります。

次に、重複レコードを検出してフィルター処理するためにメモリ内キャッシュを設定する必要があります。これにより、重複レコードがパイプラインをさらに進むのを抑制することができます。

キャッシュの仕様 (最大しきい値)

  • 80バイトのデータ
  • 10,000 レコード/秒
  • 60 秒のキャッシュ = キー数 = 60,000
  • (小計 48000000 バイト = 48Mb )
  • 理想的なキャッシュ サイズ = 5 分 (または 240Mb)
  • 許容可能なランタイム キャッシュ サイズの膨張 = 1 GB

質問

最も効率的なルックアップ、古いキャッシュ データのパージ、およびヒットしたデータの有効期限切れを防止できるメモリ内キャッシュ、ディクショナリ、ハッシュ テーブル、配列などを設定する最善の方法は何ですか。

ASP.Net CacheSystem.Runtime.MemoryCacheを見ましたが、正しいスループットを達成するには、より軽量でカスタマイズされたものが必要だと思います。また、代わりにSystem.Collections.Concurrentと、これに関連するホワイトペーパーも検討しています。

最善のアプローチが何であるかについて誰か提案がありますか?

4

3 に答える 3

10

時期尚早に最適化しないでください。

アンマネージ コードやポインターなどに頼らずに、これを行う合理的に簡潔な方法があるかもしれません。

私の古い普通のラップトップでの簡単なテストでは、1,000,000 エントリを に追加し、HashSet100,000 エントリを約 100 ミリ秒で削除できることがわかりました。その後、同じ 1,000,000 の値で 60 ミリ秒以内に繰り返すことができます。これは long だけを扱うためのものです。80 バイトのデータ構造は明らかに大きくなりますが、簡単なベンチマークが必要です。

私の推奨事項:

  • 「ルックアップ」と「重複検出」を として実装しますHashSet。これは、挿入、削除、検索が非常に高速です。

  • 実際のバッファー (新しいイベントを受け取り、古いイベントを期限切れにする) を適切な大きさの循環/リング バッファーとして実装します。これにより、メモリの割り当てと割り当て解除が回避され、エントリを前面に追加して背面から削除できます。キャッシュ内のアイテムを期限切れにするためのアルゴリズムを説明する 1 つ (2 つ目) を含むいくつかの役立つリンクを次に示します。

.NET の循環バッファ

入力数値の最小値、最大値、平均値の高速計算

ジェネリック C# RingBuffer

Java または C# で効率的な循環バッファーをどのようにコーディングしますか?

  • イベントの時間 (最後の 5 分間など) ではなく要素数 (100,000 など) によってキャッシュを制限したい場合は、循環バッファーがさらに優れていることに注意してください。

  • 項目がバッファー (最初に最後から検索する) から削除されると、それらはまたからHashSetも削除できます。両方のデータ構造を同じにする必要はありません。

  • 必要になるまでマルチスレッドは避けてください。自然に「シリアル」ワークロードがあります。CPU スレッドの 1 つが速度を処理できないことがわかっている場合を除き、単一のスレッドに保ちます。これにより、競合、ロック、CPU キャッシュ ミス、その他のマルチスレッドの問題が回避されます。これらのマルチスレッドの問題は、非常に並列化されていないワークロードの速度を低下させる傾向があります。ここでの私の主な注意点は、イベントの「受信」をそれらの処理から別のスレッドにオフロードしたい場合があるということです。

  • 上記の推奨事項は、高性能で安定した動作のイベント駆動型システム (メッセージング キューなど) の基礎として使用されるStaged event-driven Architecture (SEDA) の背後にある主なアイデアです。

上記の設計はきれいにラップでき、最小限の複雑さで必要な生のパフォーマンスを達成しようとします。これは、効率を抽出して測定できる適切なベースラインを提供するだけです。

(: キャッシュの永続性が必要な場合は、Kyoto Cabinetを参照してください。キャッシュを他のユーザーに表示したり、分散させたりする必要がある場合は、Redisを参照してください。

于 2012-05-12T22:19:24.850 に答える
0

これは、単一のスレッドで動作するサンプルです。このコードは、2 つの辞書を使用してデータを追跡します。1 つのディクショナリは間隔ごとにレコードを追跡するために使用されhashDuplicateTracker、2 つ目のディクショナリはディクショナリの特定の値を期限切れにするために使用されます。HashesByDate

バグ: CheckDataFreshness には、ElementAt() に関連するいくつかのバグがあります... 私はこれに取り組んでいます。

私がしなければならないいくつかの改善

  • Linq 演算子 ElementAt(x) を別のものに置き換えます
  • CheckDataFreshness が間隔ごとに 1 回以上実行されないようにしてください

これをマルチスレッド化するには

  • FrequencyOfMatchedHash、DecrementRecordHash、DecrementRecordHash の Dictionary を ConcurrentDictionary に置き換えます。
  • ConcurrentDictionary のソートされたバージョンを取得するか、ロックを使用しますHashesByDate

 public class FrequencyOfMatchedHash : Dictionary<int,int>
{ 
    public void AddRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            this[hashCode]++;
        }
        else
        {
            this.Add(hashCode, 1);
        }
    }
    public void DecrementRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            var val = this[hashCode];
            if (val <= 1)
            {
                this.Remove(hashCode);
            }
        } 
    }

    public override string ToString()
    {
        return this.Count + " records";
    }
}

public class HashDuplicateTracker : Dictionary<int, int >
{

    internal void AddRecord(int recordHash)
    {
        if (this.ContainsKey(recordHash))
        {
            this[recordHash]++;
        }
        else
        {
            this.Add(recordHash, 1);
        }
    }
}


public class HashesByDate : SortedDictionary<DateTime, FrequencyOfMatchedHash>
{
    internal void AddRecord(DateTime dt, int recordHash)
    {
        if (this.ContainsKey(dt))
        {
            this[dt].AddRecordHash(recordHash);
        }
        else
        {

            var tmp = new FrequencyOfMatchedHash();
            tmp.AddRecordHash(recordHash);

            var tmp2 = new FrequencyOfMatchedHash();
            tmp2.AddRecordHash(recordHash);
            this.Add(dt, tmp);
        }
    }
}
public class DuplicateTracker
{
    HashDuplicateTracker hashDuplicateTracker = new HashDuplicateTracker();

    // track all the hashes by date
    HashesByDate hashesByDate = new HashesByDate();


    private TimeSpan maxRange;
    private int average;

    public DuplicateTracker(TimeSpan range)
    {
        this.maxRange = range;
    }

    public void AddRecordHash(DateTime dt, int recordHash)
    {
        if (hashesByDate.Count == 0)
        {
            hashDuplicateTracker.AddRecord(recordHash);
            hashesByDate.AddRecord(dt, recordHash);

            return;
        }
        else
        {
            // Cleanup old data
            DateTime maxDate = hashesByDate.ElementAt(hashesByDate.Count - 1).Key;
            DateTime oldestPermittedEntry = maxDate - maxRange;

            if (dt >= oldestPermittedEntry)
                try
                {
                    hashDuplicateTracker.AddRecord(recordHash);
                    hashesByDate.AddRecord(dt, recordHash);

                    CheckDataFreshness(oldestPermittedEntry);
                }
                catch (ArgumentException e)
                {
                    // An entry with the same key already exists.
                    // Increment count/freshness
                    hashesByDate[dt].AddRecordHash(recordHash);
                    hashDuplicateTracker[recordHash]++;
                    CheckDataFreshness(oldestPermittedEntry);
                }
        }
    }


    /// <summary>
    /// This should be called anytime data is added to the collection
    /// 
    /// If threading issues are addressed, a more optimal solution would be to run this on an independent thread.
    /// </summary>
    /// <param name="oldestEntry"></param>
    private void CheckDataFreshness(DateTime oldestEntry)
    {
        while (hashesByDate.Count > 0)
        {
            DateTime currentDate = hashesByDate.ElementAt(0).Key;

            if (currentDate < oldestEntry)
            {
                var hashesToDecrement = hashesByDate.ElementAt(0).Value;

                for (int i = 0; i < hashesToDecrement.Count; i++)
                {
                    int currentHash = hashesToDecrement.ElementAt(0).Key;
                    int currentValue = hashesToDecrement.ElementAt(0).Value;

                    // decrement counter for hash
                    int tmpResult = hashDuplicateTracker[currentHash] - currentValue;
                    if (tmpResult == 0)
                    {
                        // prevent endless memory growth.
                        // For performance this might be deferred 
                        hashDuplicateTracker.Remove(tmpResult);
                    }
                    else
                    {
                        hashDuplicateTracker[currentHash] = tmpResult;
                    }

                    // remove item
                    continue;
                }

                hashesByDate.Remove(currentDate);

            }
            else
                break;
        }
    }

 }
于 2012-05-13T19:39:40.530 に答える
0

これを裏付けるものは何もありませんが、週末の練習は好きです :)

パージを解決するには、最新の値が最も古い値を上書きする循環キャッシュを使用できます (もちろん、そのように正確なn 分のキャッシュはありません)。そのため、最後のレコードが移動したオフセットを覚えておくだけで済みます。キャッシュの初期化されていないデータと 0 のみのレコードが一致するのを防ぐために、最初のレコードのコピーでキャッシュを埋めることでキャッシュを初期化できます。

次に、単純に最初のバイトから照合を開始できます。レコードが一致しない場合は、そのレコードの残りのバイトをスキップして、キャッシュの最後まで次のバイトとの照合を試みます。

レコードにヘッダーとそれに続くデータが含まれている場合は、逆方向に照合して、一致しないデータを見つける速度を上げることができます。

于 2012-05-12T14:58:50.440 に答える