歴史家のデータベースから約50Hzのデータを受信しています。これをもう一方の端から約10Hzでストリーミングしたいと思います。これを実現するために、2つのタイマーを生成します。1つは履歴データベースからデータを取得するためのものです(これは送信タイマーの2倍の速度で実行します)。200ms(10Hz)の2番目のタイマー。最初のタイマーは、取得したすべての値をBlockingCollectionに格納します(ConcurrentQueueも試しました)。コレクションに対して読み取り/書き込みを行う2つの異なるスレッドが原因で、通常のリスト/キューを使用できません。これに伴う課題は、50Hzも得られないことです。呼び出しごとに約5つの値を取得することを期待していますが、データはバーストで取得されます(つまり、何もない場合もあれば、15の場合もあります)。したがって、送信タイマーがデータを取得するバッファーとしてBlockingCollectionを使用します。送信タイマーは再帰的な方法を使用して、リストの中央にあるデータを取得します。値が送信されているかどうかを確認し、送信されていない場合は次の5番目の値を取得します(50Hz-> 10Hz)。今問題に。オブジェクトをロックしても、再帰メソッドが同じ値を2回送信することがあります。ロックが意図したとおりに機能していないという問題を特定しましたが、その理由や解決方法がわかりません。
助言がありますか?添付されているコードは実際のコードではありませんが、問題を示しており、実際には重複した値を示していますが、それほど頻繁ではありません。
class Program {
private readonly ConcurrentDictionary<int, BlockingCollection<TestObject>> _pipes = new ConcurrentDictionary<int, BlockingCollection<TestObject>>();
private const int Interval = 5;
private Timer inTimer;
private Timer outTimer;
static void Main() {
Program program = new Program();
program.Run();
Console.ReadLine();
}
private void Run() {
_pipes[100] = new BlockingCollection<TestObject>();
_pipes[200] = new BlockingCollection<TestObject>();
_pipes[300] = new BlockingCollection<TestObject>();
inTimer = new Timer(InTimer, null, 0, 100);
Thread.Sleep(1000);
outTimer = new Timer(OutTimer, null, 0, 200);
}
private void OutTimer(object state) {
foreach (TestObject testObject in GetNotSentTestObjects()) {
Console.WriteLine("{0};{1};{2}", testObject.PipeId, testObject.Timestamp.ToString("o"), testObject.Value);
}
}
private IEnumerable<TestObject> GetNotSentTestObjects() {
List<TestObject> testObjects = new List<TestObject>();
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
TestObject testObject = GetLatestTestObjectNotSent(pipe.Key, (pipe.Value.Count / 2));
if (testObject == null) {
return null;
}
testObjects.Add(testObject);
}
return testObjects;
}
private TestObject GetLatestTestObjectNotSent(int key, int locationInList) {
BlockingCollection<TestObject> pipe = _pipes[key];
TestObject testObject;
lock (pipe) {
testObject = pipe.ElementAt(locationInList - 1);
if (testObject.Sent) {
int nextLocationInList = locationInList + Interval;
GetLatestTestObjectNotSent(key, nextLocationInList);
}
testObject.Sent = true;
}
testObject.PipeId = key;
return testObject;
}
private void InTimer(object sender) {
Random random = new Random();
for (int i = 0; i < random.Next(0,20); i++) {
foreach (KeyValuePair<int, BlockingCollection<TestObject>> pipe in _pipes) {
pipe.Value.Add(new TestObject());
}
i++;
}
}
}
internal class TestObject {
public DateTime Timestamp { get; set; }
public string Value { get; set; }
public bool Sent { get; set; }
public int PipeId;
public TestObject() {
Value = Guid.NewGuid().ToString().Substring(0, 8);
Timestamp = DateTime.Now;
}
}
Thread.Sleepは、リストにデータを入力できるようにするためのものであることに注意してください。私のコードには、しばらくしてリストをトリミングするコードもあります。