0

これが私のシナリオです。外部データ ソースから大量のデータを取得しており、ローカルの 2 か所に書き込む必要があります。宛先の1つは書き込みが非常に遅いですが、もう1つは超高速です(ただし、低速の宛先への読み取りと書き込みには依存できません)。これを実現するために、私は Producer-Consumer パターン (BlockingCollection を使用) を使用しています。

私が今抱えている問題は、2 つの BlockingCollection でデータをキューに入れる必要があり、それがあまりにも多くのメモリを消費することです。私のコードは以下の例と非常によく似ていますが、1 つのキューから 2 つのタスクを実行したいと考えています。それを行う適切な方法を知っている人はいますか?以下のコードに非効率性はありますか?

class Program
{
    const int MaxNumberOfWorkItems = 15;
    static BlockingCollection<int> slowBC = new BlockingCollection<int>(MaxNumberOfWorkItems);
    static BlockingCollection<int> fastBC = new BlockingCollection<int>(MaxNumberOfWorkItems);

    static void Main(string[] args)
    {
        Task slowTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in slowBC.GetConsumingEnumerable())
            {
                Console.WriteLine("SLOW -> " + item);
                Thread.Sleep(25);
            }
        });

        Task fastTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in fastBC.GetConsumingEnumerable())
            {
                Console.WriteLine("FAST -> " + item);
            }
        });

        // Population two BlockingCollections with the same data. How can I have a single collection?
        for (int i = 0; i < 100; i++)
        {
            while (slowBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for slowBC...");
            }

            while (fastBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for 2...");
            }
        }

        slowBC.CompleteAdding();
        fastBC.CompleteAdding();

        Task.WaitAll(slowTask, fastTask);

        Console.ReadLine();
    }
}
4

1 に答える 1

0
  1. 生産者/消費者キューを使用して単一のintを転送することは、非常に非効率的です。あなたはチャンクでそれをrxしているので、キューを「* chunk」と入力してチャンク全体を送信し、同じ参照で新しいチャンクをすぐに作成/デプールしてみませんか。rxに可変。次の大量のデータ?これは、PCキューが通常、重要な量のデータ(実際のデータではなく、参照/ポインターのキューイング)に使用される方法です。スレッドには共有メモリスペースがあるため(一部の開発者は問題を引き起こすと考えているようです)、それを使用します-ポインタ/参照をキューに入れ、MBのデータを1つのポインタとして安全に転送します。コードの次の行で、古いスレッドをキューに入れた後、常に新しいスレッドを作成/デプールする限り、プロデューサースレッドとコンシューマースレッドが同じチャンクで動作することはありません。

    *チャンクのキューイングは、大きなチャンクに対して10の累乗の効率です。

  2. *チャンクを高速リンクに送信し、そこから低速リンクに「転送」します。

  3. 低速リンクがシステムをブロックして最終的にOOMエラーを引き起こさないようにするためには、フロー制御全体が必要になる場合があります。私が通常行うことは、合計バッファーサイズの「全体的な」クォータを修正し、起動時にチャンクのプールを作成することです(プールは、起動時に* new(chunks)が設定された別のBlockingCollectionです)。プロデューサースレッドはチャンクをデキューし、データで埋め、FASTスレッドにキューに入れます。FASTスレッドは、受信したチャンクを処理してから、*チャンクをSLOWスレッドにキューイングします。SLOWスレッドは同じデータを処理してから、プロデューサースレッドで再利用するために「使用済み」チャンクを再プールします。これにより、フロー制御システムが形成されます。SLOWスレッドが遅すぎる場合は、プロデューサーは最終的に空のプールから*チャンクをデプールしようとし、SLOWスレッドが使用済みの*チャンクを再プールしてプロデューサースレッドに再度実行するように通知するまでそこでブロックします。低速スレッドで操作をタイムアウトして*チャンクを早期にダンプするためにポリシーが必要になる場合があるため、データをドロップする-全体的な要件を考慮してそのポリシーを決定する必要があります-データを高速に継続的にキューに入れることは明らかに不可能です遅いコンシューマーが一部のデータをダンプしない限り、メモリオーバーフローなしで永久に遅いコンシューマー。

編集-ああ、そうです、プールを使用すると、使用されたチャンクのGCが排除され、パフォーマンスがさらに向上します。

全体的なフローポリシーの1つは、低速スレッドにデータをダンプしないことです。データフローが継続的に高くなると、*チャンクはすべて高速スレッドと低速スレッドの間のキューに配置され、プロデューサースレッドは実際に空のプールでブロックされます。次に、ネットワーク接続は独自のフロー制御を適用して、ネットワークピアがTCPを介してこれ以上データを送信しないようにします。これにより、フロー制御が低速スレッドからピアまで拡張されます。

于 2012-06-29T08:36:42.833 に答える