2

歴史家のデータベースから約50Hzのデータを受信して​​います。これをもう一方の端から約10Hzでストリーミングしたいと思います。これを実現するために、2つのタイマーを生成します。1つは履歴データベースからデータを取得するためのものです(これは送信タイマーの2倍の速度で実行します)。200ms(10Hz)の2番目のタイマー。最初のタイマーは、取得したすべての値をBlockingCollectionに格納します(ConcurrentQueueも試しました)。コレクションに対して読み取り/書き込みを行う2つの異なるスレッドが原因で、通常のリスト/キューを使用できません。これに伴う課題は、50Hzも得られないことです。呼び出しごとに約5つの値を取得することを期待していますが、データはバーストで取得されます(つまり、何もない場合もあれば、15の場合もあります)。したがって、送信タイマーがデータを取得するバッファーとしてBlockingCollectionを使用します。送信タイマーは再帰的な方法を使用して、リストの中央にあるデータを取得します。値が送信されているかどうかを確認し、送信されていない場合は次の5番目の値を取得します(50Hz-> 10Hz)。今問題に。オブジェクトをロックしても、再帰メソッドが同じ値を2回送信することがあります。ロックが意図したとおりに機能していないという問題を特定しましたが、その理由や解決方法がわかりません。

助言がありますか?添付されているコードは実際のコードではありませんが、問題を示しており、実際には重複した値を示していますが、それほど頻繁ではありません。

class Program {

    private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>();
    private const int Interval = 5;

    private Timer inTimer;
    private Timer outTimer;

    static void Main() {
        Program program = new Program();
        program.Run();
        Console.ReadLine();
    }

    private void Run() {
        _pipes[100] = new BlockingCollection<TestObject>();
        _pipes[200] = new BlockingCollection<TestObject>();
        _pipes[300] = new BlockingCollection<TestObject>();

        inTimer = new Timer(InTimer, null, 0, 100);
        Thread.Sleep(1000);
        outTimer = new Timer(OutTimer, null, 0, 200);
    }

    private void OutTimer(object state) {
        foreach (TestObject testObject in GetNotSentTestObjects()) {
            Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value);
        }
    }

    private IEnumerable<TestObject> GetNotSentTestObjects() {
        List<TestObject> testObjects = new List<TestObject>();

        foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
            TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count / 2));
            if (testObject == null) {
                return null;
            }
            testObjects.Add(testObject);
        }
        return testObjects;
    }

    private TestObject GetLatestTestObjectNotSent(int key, int locationInList) {

        BlockingCollection<TestObject> pipe = _pipes[key];
        TestObject testObject;
        lock (pipe) {
            testObject = pipe.ElementAt(locationInList - 1);

            if (testObject.Sent) {
                int nextLocationInList = locationInList + Interval;
                GetLatestTestObjectNotSent(key, nextLocationInList);
            }
            testObject.Sent = true;
        }
        testObject.PipeId = key;
        return testObject;
    }

    private void InTimer(object sender) {
        Random random = new Random();
        for (int i = 0; i < random.Next(0,20); i++) {
            foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
                pipe.Value.Add(new TestObject());
            }
            i++;
        }
    }
}

internal class TestObject {
    public DateTime Timestamp { get; set; }
    public string Value { get; set; }
    public bool Sent { get; set; }
    public int PipeId;

    public TestObject() {
        Value = Guid.NewGuid().ToString().Substring(0, 8);
        Timestamp = DateTime.Now;
    }
}

Thread.Sleepは、リストにデータを入力できるようにするためのものであることに注意してください。私のコードには、しばらくしてリストをトリミングするコードもあります。

4

3 に答える 3

1

ABlockingCollectionはキューです。ランダムアクセス用に設計されていないため、ランダムアクセスを試みても問題は発生しません。また、キューからアイテムを削除することもありません。つまり、最終的にはメモリが不足することになります。

発生している可能性のある問題の1つは、でアクセスしたときにリストをElementAtロックしていても、アイテムを追加したときにリストをロックしていないことです。しかし、同時データ構造をロックするという考え全体で、設計を再考する必要があります。同時データ構造をロックする必要はありません。

私が理解しているように、あなたはまだ処理されていない5番目のアイテムごとに選択したいと思います。未処理のアイテム(つまり、アイテム1〜4)を破棄したくない場合、これはかなり難しい問題であることがわかります。これらのアイテムを破棄する場合は、Take5回呼び出すだけです。

TestObject o;
for (int i = 0; i < 5; ++i)
{
    o = pipe.Take();
}
// You now have the 5th item from the queue
// The other items have been discarded

後で処理できるようにこれらのアイテムを保持したい場合は、何らかの方法でそれらをキューに追加し直す必要があります。ただし、キューの最後に移動します。つまり、古いアイテムを新しいアイテムの前に処理します。

また、プロデューサーが消費されるよりも速く物を追加しているため、ある時点でキューがいっぱいになります。

このアプリケーションをどのように機能させるかについてもう少し考えてみるか、それを私たちに説明するためのより良い仕事をする必要があります。あなたが説明したこととあなたが投稿したコードは...混乱していて、私たちが良い推薦をすることを不可能にします。

于 2013-03-18T23:56:19.343 に答える
0

同じスレッドが同じロックを何度も取得できます。この例を参照してください

object o = new object();
lock (o) lock (o) Console.WriteLine("Acquired lock two times");

Mutex、Semaphore、AutoResetEventなどの他の同期プリミティブを使用する必要があります...

于 2013-03-18T20:33:26.913 に答える
0

たくさんのヘッドバッシングの後、再帰的な方法を使用するときに戻る必要があることがわかりました...

于 2013-03-19T08:59:44.917 に答える