26

Aタイプとタイプのイベントを生成するスレッドがたくさんありBます。

私のプログラムはこれらのイベントを受け取り、メッセージでラップしてネットワーク経由で送信します。メッセージは、1つのAイベント、1つのBイベント、または1つのAイベントと1つのイベントのいずれかを保持できますB

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));

タイプのイベントはA非常に頻繁に発生しますが、タイプのイベントはBそれほど頻繁には発生しません。したがって、スレッドがBイベントを生成するとき、私のプログラムは、別のスレッドがイベントを生成するかどうかを確認するために少し待機し、可能であればイベントとAイベントを結合します。AB

これが私のコードです:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}

これは今のところ機能します。ただし、2つの問題があります。

  • イベントがたくさんあり、Aイベントがたくさんある場合B、アルゴリズムはあまり効率的ではありません。十分なイベントがある場合でも、特定の割合のBイベントのみがイベントに関連付けられます。AA

  • Aしばらくの間イベントが生成されない場合(まれですが、不可能ではありません)、アルゴリズムは完全に不公平です。Bイベントを生成する1つのスレッドは毎回待機する必要がありますが、他のすべてのスレッドはBイベントをすぐに送信できます。

アルゴリズムの効率と公平性をどのように改善できますか?

制約:
•  WrapAそしてWrapB、短い決定論的な時間内に終了する必要があります。
•  SendMessageロックの外部で呼び出す必要があります。
•以外に利用可能な同期メカニズムはありませんgate
•利用可能な追加のスレッド、タスク、タイマーなどはありません。
A通常の場合、タイプのイベントは非常に頻繁に発生するため、ビジーウェイトインWrapBは問題ありません。


ベンチマークとして使用できるテストプログラムは次のとおりです。

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}
4

14 に答える 14

7

それを楽しむために、ここにロックフリーの実装があります:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}

結果

元の実装:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

ロックフリーの実装:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

アップデート

残念ながら、上記の実装にはバグといくつかの欠点があります。改善されたバージョンは次のとおりです。

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let's send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}

結果:

OPの実装

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

2番目の実装:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
于 2013-02-26T02:14:08.123 に答える
5

多様性のために、私は並行コレクションに基づくアプローチを試みました。私には、投稿された制約からそれが大丈夫かどうかは明らかではありませんが、とにかく私の答えを撃ちます:

これは、私のマシンの元のコードからの典型的な出力です。

00:00:01.7835426
0:  0
A:  723
B:  223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

これは私の提案からの典型的な出力であり、元のコードよりも約20%遅くなりますが、より多くの「AB」メッセージをキャプチャします。

00:00:02.1322512
0:  0
A:  701
B:  201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

MessageWrapperの実装:

public class MessageWrapper
{
    private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
    private BlockingCollection<int?> messageB = new BlockingCollection<int?>();

    public Message WrapA(int a, int millisecondsTimeout)
    {
        messageA.Add(a);
        return CreateMessage(0);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        messageB.Add(b);
        return CreateMessage(millisecondsTimeout);
    }

    private Message CreateMessage(int timeout)
    {
        int? a, b;

        if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
        {
            return new Message(a, b);
        }
        else
        {
            return null;
        }
    }
}
于 2013-02-24T17:45:12.050 に答える
1

さて、私は速いAとABを作成し、次に遅いBを作成しようとしました。これは、全体の時間が遅くなることを意味します(主に、bのみのストリームのため)が、時間とaのみの合計時間は速くなります。結果は次のとおりです。

A's only: 00:00:00.3975499
Combine: 00:00:00.4234934
B's only: 00:00:02.0079422
Total: 00:00:02.8314751
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

コードは次のとおりです。

    class MessageWrapper
    {
        object bMessageLock = new object();
        object pendingBLock = new object();
        int? pendingB;

        ManualResetEvent gateOpen = new ManualResetEvent(true); // Gate is open initially.


        private bool IsGateOpen()
        {
            return gateOpen.WaitOne(0);
        }

        private void OpenGate()
        {
            gateOpen.Set();
        }

        private void CloseGate()
        {
            gateOpen.Reset();
        }


        public Message WrapA(int a, int millisecondsTimeout)
        {
            // check if the gate is open. Use WaitOne(0) to return immediately.
            if (IsGateOpen())
            {
                return new Message(a, null);
            }
            else
            {
                // This extra lock is to make sure that we don't get stale b's.
                lock (pendingBLock)
                {
                    // and reopen the gate.
                    OpenGate();

                    // there is a waiting b
                    // Send combined message
                    var message = new Message(a, pendingB);

                    pendingB = null;

                    return message;
                }
            }
        }

        public Message WrapB(int b, int millisecondsTimeout)
        {

            // Remove this if you don't have overlapping B's
            var timespentInLock = Stopwatch.StartNew();

            lock (bMessageLock) // Only one B message can be sent at a time.... may need to fix this.
            {
                pendingB = b;

                // Close gate
                CloseGate();


                // Wait for the gate to be opened again (meaning that the message has been sent)
                if (timespentInLock.ElapsedMilliseconds < millisecondsTimeout && 
                    gateOpen.WaitOne(millisecondsTimeout - (int)timespentInLock.ElapsedMilliseconds)) 
                // If you don't have overlapping b's use this clause instead.
                //if (gateOpen.WaitOne(millisecondsTimeout)) 
                {
                    lock (pendingBLock)
                    {
                        // Gate was opened, so combined message was sent.
                        return null;
                    }
                }
                else
                {
                    // Timeout expired, so send b-only message.
                    lock (pendingBLock)
                    {
                        // reopen gate.
                        OpenGate();
                        pendingB = null;
                        return new Message(null, b);
                    }
                }
            }
        }


    }

主な作業は、手動リセットイベントを使用して行われます。ゲートが開いていれば、Aを自由に送ることができるという考え方です。'b'が到着したら、ゲートを閉じてAに強制的に結合させます。pendingB単一のフィールドがあると、この操作がいくらか制限されると言わなければなりません。変数が1つしかないということは、そのbをpendingBに格納できるスレッドが1つだけであることを意味します。これが私が余分なものを持っている理由ですbMessageLock

また、この変数へのアクセスを制御する必要があるため、pendingBLock

このコードにはまだバグがある可能性がありますが、テストしている限り、100個のメッセージすべてが組み合わされています。

最後に、WrapBが待機していた時間に対するチェックを含めました。元々、WrapBは合計200秒でキューに入れられていました。重複する呼び出しがある場合は、チェックを追加できます。キューに入れてもかまわない場合は、代わりに単純なコードを使用してください。

于 2013-02-27T15:52:25.090 に答える
1

ReactiveExtesionsの完璧な候補のようです。Bufferメソッドを使用して、イベントまたは他の同様の拡張機能をグループ化し、イベントをフィルタリングおよび結合できます。

たぶん、この解決策はあなたの制約の1つと一致しませんが、私の意見ではそれが最良の解決策です。リアクティブエクステンションは非常に強力です。

于 2013-02-22T23:29:51.413 に答える
1

与えられた制約にもう少し厳密に従う別の提案をします。私のマシンでは、この実装は、テストプログラムの実行時に、97以上の「AB」メッセージを一貫してキャッチします。元のコードから約5%のパフォーマンス低下があります。

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            returnMessage = new Message(a, pendingB);

            pendingB = null;
            Monitor.Pulse(gate);

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(a, null);
        }

        return returnMessage;
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        Message returnMessage = null;
        bool lockTaken = false;

        Monitor.TryEnter(gate, 100, ref lockTaken);

        if (lockTaken)
        {
            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

            if (pendingB != null)
            {
                returnMessage = new Message(null, b);
            }
            else
            {
                pendingB = b;

                if (!Monitor.Wait(gate, millisecondsTimeout))
                {
                    pendingB = null;
                    Monitor.Pulse(gate);
                    returnMessage = new Message(null, b);
                }
            }

            Monitor.Exit(gate);
        }
        else
        {
            returnMessage = new Message(null, b);
        }

        return returnMessage;
    }
}

ここで起こっていることは基本的に元のコードと同じですが、「B」メッセージを返すだけでなく、 pendingBオブジェクトがすでに存在する場合も待機しています。これにより、わずかなパフォーマンスコストで、検出できる「AB」メッセージの量が向上します。

少し面倒に見えますが、これは主に、生のロックの代わりに、よりリアルタイムに適した構成Monitor.TryTakeを使用することを選択したためです。また、単一のreturnステートメントを持つことは、 Monitor.Exitを呼び出す前にデッドロックが誤って戻るのを防ぐための巧妙なトリックです。

さまざまなタイムアウトをいじると、精度を犠牲にしてパフォーマンスを向上させることができます。その逆も可能です。100msは私の最初の推測でしたが、少なくとも私のマシンではまともなように見えます。


最後に、このWrapBの実装では、行を変更できます。

            if (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

            while (pendingB != null)
            {
                Monitor.Wait(gate, 100);
            }

100%の精度を得るには、テストプログラムからのメトリックを大幅に混乱させます。これは、「B」メッセージのみのストリームがある場合に明らかにパフォーマンスが非常に低い「B」イベントを同期するためです。

t3テストを削除すると、100個の「AB」メッセージから100個を一貫して検出しながら、元のコードよりも約5%高速に実行されます。しかし、ループを何回スピンするかわからないため、ランタイムはもちろん決定論的ではなくなります。

編集:

まあ、私たちが次のようなことをしない限り

            int spinCount = 0;

            while (pendingB != null && spinCount < 5)
            {
                spinCount++;
                Monitor.Wait(gate, 100);
            }

これにより、待機時間の上限がわかります。'B'メッセージのみのストリームがある場合、パフォーマンスの問題を解決し、元のコードとほぼ同時に実行され、100個の'AB'メッセージから100個を一貫して検出します。

于 2013-02-25T20:08:53.077 に答える
0

それがあなたが望むことをするかどうかはわかりませんが、ここに私の提案があります。基本的に、可能な限りBメッセージをAに渡し、メッセージが送信されたことを確認します。

class MessageWrapper
{
    object gate = new object();
    int? pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;

        lock (gate)
        {
            b = pendingB;
            pendingB = null;
            Thread.Sleep(1); // yield. 1 seems the best value after some testing
        }

        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int? bb = b;

        lock (gate)
        {
            if (pendingB == null)
            {
                pendingB = b;
                bb = null;
            }
        }

        Thread.Sleep(3);

        if (bb == null)
        {
            lock (gate)
            {
                if (pendingB != null)
                {
                    bb = pendingB;
                    pendingB = null;
                }
            }
        }
        return new Message(null, bb);
    }
}
于 2013-02-23T13:08:08.843 に答える
0

これが別の試みです。アプローチは、イベントがイベントにアタッチされるのを待つのではなく、イベントの生成がAイベントにアタッチされるのを待つことです。BBA

object gate = new object();
int? pendingA;

public Message WrapA(int a, int millisecondsTimeout)
{
    bool queued = false;

    lock (gate)
    {
        if (pendingA == null)
        {
            queued = true;
            pendingA = a;
            Monitor.Pulse(gate);
        }
    }

    if (queued)
    {
        Thread.Sleep(3);
        lock (gate)
        {
            if (pendingA == null)
                return null;

            a = pendingA.Value;
            pendingA = null;
        }
    }

    return new Message(a, null);
}

public Message WrapB(int b, int millisecondsTimeout)
{
    int? a;

    lock (gate)
    {
        if (pendingA == null)
            Monitor.Wait(gate, millisecondsTimeout);

        a = pendingA;
        pendingA = null;
    }

    return new Message(a, b);
}
于 2013-02-23T17:01:59.707 に答える
0

さて、私の最初のアイデアは、優先順位も処理するセマフォを持つことですが、おそらくこの投稿はあなたにもっと洞察を与えるでしょう.Net Mutex Question

基本的に、アイデアは、2つのタイプのイベントに優先順位を付けて、タイプのイベントが受信さBれない場合にタイプのイベントを可能な限り高速に実行できるようにすることです。A

Gate以外の同期メカニズムが利用できないという3番目の制約があるため、これは適切な解決策ではない可能性があることを認識していますが、正しい方向に向けることができれば幸いです。

于 2013-02-20T15:44:23.287 に答える
0

これは、公平性を向上させるアプローチのスケッチです。つまり、すべての B送信には最大100ミリ秒の遅延が発生します。しかし、それがあなたの制約に合うかどうかはわかりません。

  • グローバルコンテキストでは、タイプの単一のMessageSenderオブジェクトがありますIMessageSender
  • IMessageSender、、DefaultMessageSenderおよび(値BWrappingMessageSenderを格納する)の2つの実装がありますb

メッセージ送信者の動作は次のとおりです。

  • DefaultMessageSender送信を求められたらA:送信するだけです
  • DefaultMessageSender:を送信するように求められると、グローバルを、渡されたばかりの値を知っている新しいものにB切り替えますMessageSenderBWrappingMessageSenderb

  • BWrappingMessageSender:を送信するように求められるAと、渡されたABaとそれ自体のABを送信しb、グローバルMessageSenderDefaultMessageSender

  • BWrappingMessageSenderを送信するように求められると、B:を送信しB、グローバルを、渡されたばかりの値を知っている新しいものにb切り替えます。MessageSenderBWrappingMessageSenderb

私が特定していないのは、新しく作成されたものが、その間に他に何もするように指示されていない場合、作成されてから100ミリ秒後にBWrappingMessageSenderプレーンを送信することを知っている方法です。B

于 2013-02-20T17:12:10.790 に答える
0

これがいくつかの実験の後の私の解決策です:

  • 単一要素のキューが空の場合は、その場を利用します。
  • すでにスポットが取られている場合は、乗員を丁寧に動かして先に進み、少し待ってからもう一度やり直してください。
  • 誰かが失礼で、待っている間にその場所を乗っ取った場合、私たちは待ち行列を飛び越えて先に進みます。

コード:

Message WrapA(int a, int millisecondsTimeout)
{
    bool lockTaken = false;
    int? b = null;

    try
    {
        Monitor.TryEnter(gate, millisecondsTimeout, ref lockTaken);
        if (lockTaken)
        {
            if (pendingB != null)
            {
                b = pendingB;
                pendingB = null;
                Monitor.Pulse(gate);
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    bool lockTaken = false;

    try
    {
        TimeoutHelper timeout = new TimeoutHelper(millisecondsTimeout);
        Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
        if (lockTaken)
        {
            if (pendingB == null)
            {
                pendingB = b;
                Monitor.Wait(gate, timeout.RemainingTime());
                if (pendingB == null) return null;
                pendingB = null;
            }
            else
            {
                Monitor.Pulse(gate);
                try { }
                finally { lockTaken = false; Monitor.Exit(gate); }
                Thread.Sleep(1);
                Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
                if (lockTaken)
                {
                    if (pendingB == null)
                    {
                        pendingB = b;
                        Monitor.Wait(gate, timeout.RemainingTime());
                        if (pendingB == null) return null;
                        pendingB = null;
                    }
                }
            }
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(gate);
        }
    }

    return new Message(null, b);
}
于 2013-02-20T19:52:04.440 に答える
0

3時間試した後、次の結果を得ることができました。

00:00:01.8577304
0:0
A:741
B:241
AB:59
生成されたA:800、送信されたA:800
生成されたB:300、送信されたB:300
合計:1100

私の方法:

(1)メッセージB(現在はBと呼ばれます)があり、まだBが待機していない場合は常に、メッセージを「キュー」に入れます。指定されたタイムアウト内に他のパケットがない場合、メッセージを送信します。(2)実際にキューにBがある場合、キューの最初のBにぶつかり、このメッセージを送信します。これは公平性を確保するためです。送信される新しいBは、状況1と同じ状況に従います(キューに入れられ、指定された時間内に送信されます)。(3)メッセージA(以降Aと呼びます)があり、保留中のBがない場合、Aはすぐに送信されます。実際の待機は実行されません。(4)Aを送信し、キューにBがある場合、キューから「盗む」ことになります。両方のメッセージがラップされ、一緒に送信されます。Bは他のスレッドで送信されるのを待っており、Aはそれを盗んだので、nullチェックが必要です。AはBに通知しますが、Bは送信するものがないことに気付きます。Bはnullを返します。

コードでこれを実現するには:

public class MessageWrapper
{
    readonly object _gate = new object();
    int? _pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? currentB;

        lock (_gate)
        {
            currentB = _pendingB;
            _pendingB = null;

            Monitor.Pulse(_gate); // B stolen, get rid of waiting threads
        }

        return new Message(a, currentB);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (_gate)
        {
            if (_pendingB != null)
            {
                var currentB = _pendingB;
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for fairness

                return new Message(null, currentB);
            }
            else
            {
                _pendingB = b;

                Monitor.Pulse(_gate); // release for fairness
                Monitor.Wait(_gate, millisecondsTimeout); // wait for A

                if (_pendingB == null) return null;

                var currentB = _pendingB;
                _pendingB = null;
                return new Message(null, currentB);
            }
        }
    }
}
于 2013-02-23T20:37:48.933 に答える
0

大きな問題。私はこれに時間を費やすことを本当に楽しんだ。私が使用したソリューションでは、元の問題が私のコンピューターハードウェアで発生したものの4倍の一致がありました。

おそらく、私がモニターとロックを使用しているよりも知識のある人がこれを改善できるでしょう。

  1. 最後にnullを返すためだけにそのスレッドに完全なスリープを行わせる代わりに、一致が行われたときに別のスレッドを解放します。おそらく、これは実際にはそれほどコストがかかりません。これを解決するために、AutoResetEventを導入しましたが、理解できない理由により、AutoResetEventが意図したとおりに動作せず、一致が100から70に減少します。

  2. スレッドの最終的なタイムアウトは、タイムアウトした後でも競合するロックを渡す必要があるため、改善できます。

要件に完全に適合します。

  1. すべてのプロセスは指定された期間内に終了します(最後のロックは、ロックの競合状況に応じて数サイクル追加される場合があります)。
  2. 送信はロックの範囲外です。
  3. ゲートを使用して同期します
  4. 追加のタイマーはありません
  5. プリファレンスとスレッドは同等に扱われます

元の質問の結果:

  1. 時間:4.5秒
  2. A:773
  3. B:273
  4. AB:27

このクラスの結果:

  1. 時間:5.4秒
  2. A:700
  3. B:300
  4. AB:100

    class MessageWrapper
    {
    object gate = new object();
    int EmptyThreadsToReleaseA = 0;
    int EmptyThreadsToReleaseB = 0;
    Queue<int> queueA = new Queue<int>();
    Queue<int> queueB = new Queue<int>();
    AutoResetEvent EmptyThreadEventA = new AutoResetEvent(false);
    AutoResetEvent EmptyThreadEventB = new AutoResetEvent(false);
    
    public Message WrapA(int a, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueB.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseB);
                EmptyThreadEventB.Set();
                return new Message(a, queueB.Dequeue());
            }
            else
            {
                queueA.Enqueue(a);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventA.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseA > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseA);
                return null;
            }
    
            return new Message(queueA.Dequeue(), null);
        }
    }
    
    public Message WrapB(int b, int millisecondsTimeout)
    {
        lock (gate)
        {
            if (queueA.Count > 0)
            {
                Interlocked.Increment(ref EmptyThreadsToReleaseA);
                EmptyThreadEventA.Set();
                return new Message(queueA.Dequeue(), b);
            }
            else
            {
                queueB.Enqueue(b);
            }
        }
    
        System.Threading.Thread.Sleep(millisecondsTimeout);
        //EmptyThreadEventB.WaitOne(millisecondsTimeout);
    
        lock (gate)
        {
            if (EmptyThreadsToReleaseB > 0)
            {
                Interlocked.Decrement(ref EmptyThreadsToReleaseB);
                return null;
            }
    
            return new Message(null, queueB.Dequeue());
        }
    }
    }
    
于 2013-02-24T01:08:38.417 に答える
0

特にタイプAのイベントでは、不要なロックを回避しようとしました。また、ラッパークラスのロジックにいくつかの変更を加えました。私の実装では、1回の呼び出しでSendB2つのBメッセージを送信できる可能性があるため、メッセージを返すだけでなく、このクラスから直接メッセージを送信する方が便利であることがわかりました。私はいくつかの説明コメントをコードに入れました

public class MessageWrapper
{
    private readonly object _gate = new object();
    private object _pendingB;

    public void SendA(int a, int millisecondsTimeout, Action<Message> send)
    {
        var b = Interlocked.Exchange<object>(ref _pendingB, null);

        send(new Message(a, (int?)b));

        // this code will just release any pending "assure that B was sent" threads.
        // but everything works fine even without it
        lock (_gate)
        {
            Monitor.PulseAll(_gate);
        }
    }

    public void SendB(int b, int millisecondsTimeout, Action<Message> send)
    {
        // needed for Interlocked to function properly and to be able to chack that exatly this b event was sent.
        var boxedB = (object)(int?)b;

        // excange currently pending B event with newly arrived one
        var enqueuedB = Interlocked.Exchange(ref _pendingB, boxedB);

        if (enqueuedB != null)
        {
            // if there was some pending B event then just send it.
            send(new Message(null, (int?)enqueuedB));
        }

        // now we have to wait up to millisecondsTimeout to ensure that our message B was sent
        lock (_gate)
        {
            // release any currently waiting threads.
            Monitor.PulseAll(_gate);

            if (Monitor.Wait(_gate, millisecondsTimeout))
            {
                // if we there pulsed, then we have nothing to do, as our event was already sent 
                return;
            }
        }

        // check whether our event is still pending 
        enqueuedB = Interlocked.CompareExchange(ref _pendingB, null, boxedB);

        if (ReferenceEquals(enqueuedB, boxedB))
        {
            // if so, then just send it.
            send(new Message(null, (int?)enqueuedB));
        }
    }
}

また、コメントで述べた理由の1つとして、テストクラスにいくつかの変更を加えました。AB句をテストする場合に備えて、すべてのテストスレッドに同期イベントを追加しました。また、同時に実行するスレッドの数を、ご使用のバージョンの500から20(すべてAB句の場合)に減らしました。それでも、これらすべてのスレッドでの呼び出しは、スレッドの数(スレッドのStartメソッドのパラメーターとして渡される)によってシフトされるため、テストが依然として非常に適切であることを願っています。

public static class Program
{
    private static int _counter0 = 0;
    private static int _counterA = 0;
    private static int _counterB = 0;
    private static int _counterAb = 0;
    private static object _lastA;
    private static object _lastB;

    private static object _firstA;
    private static object _firstB;

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        var threadsCount = 10;
        var a0called = 40;

        // Only A events
        var t0 = Start(threadsCount, a0called, 7, 1000, wrapper.SendA);
        Join(t0);

        var aJointCalled = 40;
        var bJointCalled = 10;

        var syncEvent = new CountdownEvent(threadsCount + threadsCount);
        _firstA = null;
        _firstB = null;
        // A and B events
        var t1 = Start(threadsCount, aJointCalled, 7, 1000, wrapper.SendA, syncEvent);
        var t2 = Start(threadsCount, bJointCalled, 19, 1000, wrapper.SendB, syncEvent);
        Join(t1);
        Join(t2);
        var lastA = _lastA;
        var lastB = _lastB;

        var b0called = 20;

        // Only B events
        var t3 = Start(threadsCount, b0called, 7, 1000, wrapper.SendB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", _counter0);
        Console.WriteLine("A:  {0}", _counterA);
        Console.WriteLine("B:  {0}", _counterB);
        Console.WriteLine("AB: {0}", _counterAb);

        Console.WriteLine(
            "Generated A: {0}, Sent A: {1}",
            (threadsCount * a0called) + (threadsCount * aJointCalled),
            _counterA + _counterAb);
        Console.WriteLine(
            "Generated B: {0}, Sent B: {1}",
            (threadsCount * bJointCalled) + (threadsCount * b0called),
            _counterB + _counterAb);

        Console.WriteLine("First A was sent on {0: MM:hh:ss ffff}", _firstA);
        Console.WriteLine("Last A was sent on {0: MM:hh:ss ffff}", lastA);
        Console.WriteLine("First B was sent on {0: MM:hh:ss ffff}", _firstB);
        Console.WriteLine("Last B was sent on {0: MM:hh:ss ffff}", lastB);

        Console.ReadLine();
    }

    private static void SendMessage(Message m)
    {
        if (m != null)
        {
            if (m.A != null)
            {
                if (m.B != null)
                {
                    Interlocked.Increment(ref _counterAb);
                }
                else
                {
                    Interlocked.Increment(ref _counterA);
                    Interlocked.Exchange(ref _lastA, DateTime.Now);
                    Interlocked.CompareExchange(ref _firstA, DateTime.Now, null);
                }
            }
            else if (m.B != null)
            {
                Interlocked.Increment(ref _counterB);
                Interlocked.Exchange(ref _lastB, DateTime.Now);
                Interlocked.CompareExchange(ref _firstB, DateTime.Now, null);
            }
            else
            {
                Interlocked.Increment(ref _counter0);
            }
        }
    }

    private static Thread[] Start(
        int threadCount, 
        int eventCount, 
        int eventInterval, 
        int wrapTimeout, 
        Action<int, int, Action<Message>> wrap,
        CountdownEvent syncEvent = null)
    {
        var threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++)
        {
            threads[i] = new Thread(
                (p) =>
                    {
                        if (syncEvent != null)
                        {
                            syncEvent.Signal();
                            syncEvent.Wait();
                        }

                        Thread.Sleep((int)p);

                        for (int j = 0; j < eventCount; j++)
                        {
                            int k = (((int)p) * 1000) + j;
                            Thread.Sleep(eventInterval);
                            wrap(k, wrapTimeout, SendMessage);
                        }
                    });

            threads[i].Start(i);
        }

        return threads;
    }

    private static void Join(params Thread[] threads)
    {
        foreach (Thread t in threads)
        {
            t.Join();
        }
    }
}

PSその上、本当に興味深い質問をありがとう。

于 2013-02-26T14:21:10.343 に答える
0

これを制限する要因は、実際には制約、特にgate同期にのみ使用するという要件と、他のタイマー/スレッド/タスクなどを生成できないことです。これにより、最終的にプログラミングソリューションがMonitorオブジェクトを使用できるようになります。gateたとえば、Christofferのソリューションは、エレガントではありますが、の内部にラップされている場合を除いて、技術的には同期を使用しBlockingCollectionます。afrischkeによって以前にリストされた他の非常に革新的なソリューションも、以外の同期を使用しgateます。

多くの実験と読書と研究の結果、この問題が制約を正確に満たすより良い(より速い?)解決策を持っているとは思わないと言わざるを得ません。次のメカニズムを使用して、わずかなパフォーマンスの向上を得ることができました。きれいではありませんが、要件を満たし、少なくとも私のマシンでは平均して約1〜5%高速です。

object gate = new object();
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    lock (gate) { Monitor.Wait(gate, millisecondsTimeout); }
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;    
}

要件を緩和するgateと、特にデバッグモードで実行していない場合に、速度がさらに少し向上します。

object gate = new object();
ManualResetEvent mre = new ManualResetEvent(false /*initialState*/);
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();

public Message WrapA(int a, int millisecondsTimeout)
{
    Message message = null;
    int? b = null;
    lock (gate)
    {
        if (!_bBag.IsEmpty)
        {
            Guid key = _bBag.Keys.FirstOrDefault();
            int gotB = 0;
            if (_bBag.TryRemove(key, out gotB))
            {
                b = gotB;
                Monitor.PulseAll(gate);
            }
        }
    }

    message = new Message(a, b);
    return message;
}

public Message WrapB(int b, int millisecondsTimeout)
{
    Guid key = Guid.NewGuid();
    _bBag.TryAdd(key, b);
    mre.WaitOne(millisecondsTimeout);    // use a manual reset instead of Monitor
    int storedB = 0;
    if (_bBag.TryRemove(key, out storedB))
    {
        return new Message(null, b);
    }
    return null;
}

全体として、厳しい要件を考えると、すでに非常に微調整されたソリューションがあると思います。私は実際に私が間違っていて、誰かがより良い解決策を見つけてくれることを願っています-それは非常に有益です!

于 2013-02-27T02:27:59.247 に答える