3

気づいてくれた Matthew Watson に感謝します。また、自分のコードを c++-linux に移植する予定であるため、「プラットフォームに依存しない」コードを好みます。

私の取引アプリケーションはほとんどロックフリーです。以下のコードは、ロックを使用する唯一の場所です。コードから始めましょう。かなり長いですが、繰り返し部分がたくさんあるので心配はいりません。簡単です。私の物事がどのように機能するかをよりよく示すために、すべての「繰り返し」部分を追加することを好みます。

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Iterate();
    }
}, TaskCreationOptions.LongRunning);

private void Iterate()
{
    bool marketDataUpdated = false;

    lock (ordersToRegisterLock)
    {
        if (ordersToRegister.Count > 0)
        {
            marketDataUpdated = true;
            while (ordersToRegister.Count > 0)
            {
                Order order = ordersToRegister.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (aggrUpdatesLock)
    {
        if (aggrUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!aggrUpdates.IsNullOrEmpty())
            {
                var entry = aggrUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (commonUpdatesLock)
    {
        if (commonUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!commonUpdates.IsNullOrEmpty())
            {
                var entry = commonUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (infoUpdatesLock)
    {
        if (infoUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!infoUpdates.IsNullOrEmpty())
            {
                var entry = infoUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (tradeUpdatesLock)
    {
        if (tradeUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!tradeUpdates.IsNullOrEmpty())
            {
                var entry = tradeUpdates.Dequeue();
                // Stage1, Process
            }    

        }
    }

    if (marketDataUpdated)
    {
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.
    }
}

private readonly Queue<Order> ordersToRegister = new Queue<Order>();
private readonly object ordersToRegisterLock = new object();

private readonly Queue<AggrEntry> aggrUpdates = new Queue<AggrEntry>();
private readonly object aggrUpdatesLock = new object();

private readonly Queue<CommonEntry> commonUpdates = new Queue<CommonEntry>();
private readonly object commonUpdatesLock = new object();

private readonly Queue<InfoEntry> infoUpdates = new Queue<InfoEntry>();
private readonly object infoUpdatesLock = new object();

private readonly Queue<TradeEntry> tradeUpdates = new Queue<TradeEntry>();
private readonly object tradeUpdatesLock = new object();


    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
    {
        lock (ordersToRegisterLock)
        {
            ordersToRegister.Enqueue(e.order);
        }
    }

    public void TradeUpdated(object sender, Gate.TradeArgs e)
    {
        lock (tradeUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                tradeUpdates.Enqueue(entry);
            }
        }
    }

    public void InfoUpdated(object sender, Gate.InfoArgs e)
    {
        lock (infoUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                infoUpdates.Enqueue(entry);
            }
        }
    }

    public void CommonUpdated(object sender, Gate.CommonArgs e)
    {
        lock (commonUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                commonUpdates.Enqueue(entry);
            }
        }
    }

    public void AggrUpdated(object sender, Gate.AggrArgs e)
    {
        lock (aggrUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                aggrUpdates.Enqueue(entry);
            }
        }
    }

私のコードでは、2 つの段階があります。Stage1更新段階であり、Stage2作業段階です。次のように、これら 2 つの段階をできるだけ早く切り替える必要があります。

  • アップデートはありますか?番号
  • アップデートはありますか?番号
  • 更新されましたか?はい、注文が更新されました!アップデートを適用するStage2
  • アップデートはありますか?番号
  • アップデートはありますか?はい、注文を登録する必要があります! アップデートを適用するStage2
  • アップデートはありますか?はい、取引が発生しました。更新を適用してください。Stage2

更新する必要はStage2ありませんが、後で適用できるように更新を「収集」し続ける必要があります。

そして重要なこと - これは非常にレイテンシが重要なコードであるため、最小限のレイテンシを実現するために 1 つのコアを「使用」することに同意します! したがって、更新が発生した場合は、できるだけ早く処理して実行する必要がありますStage2

ですから、何を達成する必要があるのか​​ 、それをどのように実装したのかが明確になったことを願っています. ここで、私のコードがどれほど優れているかについて説明します。潜在的な問題がいくつか見られます。

  • ロックがいっぱい!「ロックフリー」コードに置き換えることはできますか? CASか何かでスピンロック?
  • CPU コアの 100% を占有している場合、レイテンシーに影響を与えずにCPU リソースを節約できますか?
  • 余分な「スイッチ」を避けるために、.NET に「専用」コアを使用する (タスク アフィニティを設定する) ように指示できますか?
  • あるスレッドからキューに追加し、別のスレッドからキューを読み取ります。それは問題になるでしょうか?キューへの追加と読み取りが不安定な場合は? キャッシュ更新の問題が原因で、読み取りスレッドがキューからの更新を認識しない可能性はありますか?

私が書いたものを改善する方法についての提案は大歓迎です、ありがとう!

upd は部分的に解決されました - 私が理解しているように、クエリをロックフリー (リング バッファ ベースの可能性が高いですか?) クエリに置き換える方が良いと思います..後で C++ バージョンのディスラプターを使用すると思います。また、この記事http://www.umbraworks.net/bl0g/rebuildall/2010/03/08/Running_NET_threads_on_selected_processor_coresを使用し、タスクを「固定」コアで実行されているスレッドに置き換えましたが、まだ「ビジー」を使用しています-spin"、おそらくもっとスマートなものを使用する必要がありますか?

4

2 に答える 2

0

これは、より移植性の高いアプローチです。それが役に立てば幸い。

public class SafeQueue<T> : Queue<T>
{
    public T SafeDequeue()
    {
        lock (this)
        {
            return (Count > 0) ? Dequeue() : null;
        }
    }

    public void SafeEnqueue(T entry)
    {
        lock (this)
        {
            Enqueue(entry);
        }
    }
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Iterate();
    }
}, TaskCreationOptions.LongRunning);


private void Iterate()
{
    bool marketDataUpdated = false;

    while ((Order order = ordersToRegister.SafeDequeue()) != null)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    while ((var entry = aggrUpdates.SafeDequeue()) != null)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    while ((var entry = commonUpdates.SafeDequeue()) != null)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    while ((var entry = infoUpdates.SafeDequeue()) != null)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    while ((var entry = tradeUpdates.SafeDequeue()) != null)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    if (marketDataUpdated)
    {
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.
    }
}

private readonly SafeQueue<Order> ordersToRegister = new SafeQueue<Order>();

private readonly SafeQueue<AggrEntry> aggrUpdates = new SafeQueue<AggrEntry>();

private readonly SafeQueue<CommonEntry> commonUpdates = new SafeQueue<CommonEntry>();

private readonly SafeQueue<InfoEntry> infoUpdates = new SafeQueue<InfoEntry>();

private readonly SafeQueue<TradeEntry> tradeUpdates = new SafeQueue<TradeEntry>();

    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
    {
        ordersToRegister.SafeEnqueue(e.order);
    }

    public void TradeUpdated(object sender, Gate.TradeArgs e)
    {
        foreach (var entry in e.entries)
        {
            tradeUpdates.SafeEnqueue(entry);
        }
    }

    public void InfoUpdated(object sender, Gate.InfoArgs e)
    {
        foreach (var entry in e.entries)
        {
            infoUpdates.SafeEnqueue(entry);
        }
    }

    public void CommonUpdated(object sender, Gate.CommonArgs e)
    {
        foreach (var entry in e.entries)
        {
            commonUpdates.SafeEnqueue(entry);
        }
    }

    public void AggrUpdated(object sender, Gate.AggrArgs e)
    {
        foreach (var entry in e.entries)
        {
            aggrUpdates.SafeEnqueue(entry);
        }
    }
于 2013-03-04T14:27:25.640 に答える
0

以下のコードを使用すると、「ステージ 1」の処理中にロックされなくなります。

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Iterate();
    }
}, TaskCreationOptions.LongRunning);


private void Iterate()
{
    bool marketDataUpdated = false;

    foreach (Order order in ordersToRegister)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in aggrUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in commonUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in infoUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in tradeUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    if (marketDataUpdated)
    {
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.
    }
}

private readonly ConcurrentQueue<Order> ordersToRegister = new ConcurrentQueue<Order>();

private readonly ConcurrentQueue<AggrEntry> aggrUpdates = new ConcurrentQueue<AggrEntry>();

private readonly ConcurrentQueue<CommonEntry> commonUpdates = new ConcurrentQueue<CommonEntry>();

private readonly ConcurrentQueue<InfoEntry> infoUpdates = new ConcurrentQueue<InfoEntry>();

private readonly ConcurrentQueue<TradeEntry> tradeUpdates = new ConcurrentQueue<TradeEntry>();

    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
    {
        ordersToRegister.Enqueue(e.order);
    }

    public void TradeUpdated(object sender, Gate.TradeArgs e)
    {
        foreach (var entry in e.entries)
        {
            tradeUpdates.Enqueue(entry);
        }
    }

    public void InfoUpdated(object sender, Gate.InfoArgs e)
    {
        foreach (var entry in e.entries)
        {
            infoUpdates.Enqueue(entry);
        }
    }

    public void CommonUpdated(object sender, Gate.CommonArgs e)
    {
        foreach (var entry in e.entries)
        {
            commonUpdates.Enqueue(entry);
        }
    }

    public void AggrUpdated(object sender, Gate.AggrArgs e)
    {
        foreach (var entry in e.entries)
        {
            aggrUpdates.Enqueue(entry);
        }
    }
于 2013-03-01T23:08:48.287 に答える