3

キューがどのように機能するかについて、私は完全に困惑しています。C# でデータを収集して表示する小さなマルチスレッド アプリケーションを作成しようとしています (失敗しています)。Albahari の本
を 読み、Consumer/Producer パターンを使用した後、私のデータがキューでスクランブルされているように見えることを除けば、ほとんどの作業が完了したと彼は説明しています。キューに入る前に、オブジェクトのフィールドには次の値があります

タイムスタンプ = 6
データ[] ={4936、9845、24125、44861}

デキューされた後、データは次のようになります

タイムスタンプ = 6
データ[] = {64791、19466、47772、65405}

デキュー後にデータ フィールドの値が変更される理由がわかりません。私は困惑しているので、誰かがこれを修正する正しい方向に私を向けることができるかどうか、または別の方向に私を向けて先に進むことができるかどうかを確認するために、そこに投げ出すと思いました.


関連コード


データ ストレージ用のカスタム オブジェクト

関連するオブジェクトとフィールド。クラス sensorData は、計算を格納するために使用する別のクラスです。

public class sensorData
{
    public const int bufSize = 4;
    public UInt16[] data = new UInt16[4];
    public double TimeStamp = 0; 
    public int timeIndex = 0;
}

次のフィールドは、エンキュー スレッドとデキュー スレッド間のキューとシグナルを設定するために使用されます。

EventWaitHandle wh = new AutoResetEvent(false);
Queue<sensorData> dataQ = new Queue<sensorData>();
object locker = new object();

Enqueue メソッド/スレッド

これは私のワーカー スレッドで、4 つの正弦曲線を計算し、処理のために結果をキューに入れます。また、結果をファイルに書き込むので、計算結果がわかります。

private void calculateAndEnqueueData(BackgroundWorker worker, DoWorkEventArgs e)
{
    int j = 0;
    double time = 0;
    double dist;
    UInt16[] intDist = new UInt16[sensorData.bufSize];
    UInt16 uint16Dist;

    // Frequencies of the four Sine curves
    double[] myFrequency = { 1, 2, 5, 10 };

    // Creates the output file.
    StreamWriter sw2 = File.CreateText("c:\\tmp\\QueuedDataTest.txt"); 

    // Main loop to calculate my Sine curves
    while (!worker.CancellationPending)
    {
        // Calculate four Sine curves
        for (int i = 0; i < collectedData.numberOfChannels; i++)
        {
            dist = Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
            uint16Dist = (UInt16)dist;
            intDist[i] = uint16Dist;
        }

        // Bundle the results and Enqueue them
        sensorData dat = new sensorData();
        dat.data = intDist;
        dat.TimeStamp = time;
        dat.timeIndex = j;

        lock (locker) dataQ.Enqueue(dat);
        wh.Set

        // Output results to file.
        sw2.Write(j.ToString() + ", ");
        foreach (UInt16 dd in dat.intData)
        {
            sw2.Write(dd.ToString() + ", ");
        }
        sw2.WriteLine();

        // Increments time and index.
        j++;
        time += 1 / collectedData.signalFrequency;

        Thread.Sleep(2);
    }
    // Clean up
    sw2.Close();
    lock (locker) dataQ.Enqueue(null);
    wh.Set();
    sw2.Close();
}

出力ファイルQueuedDataTest.txtの行の例

6、4936、9845、24125、44861、

Dequeue Data メソッド

このメソッドは、要素をキューから取り出し、ファイルに書き込みます。キューで null 要素が見つかるまで、その時点でジョブが完了します。

    private void dequeueDataMethod()
    {
        StreamWriter sw = File.CreateText("C:\\tmp\\DequeueDataTest.txt");

        while (true)
        {
            sensorData data = null;

            // Dequeue available element if any are there.
            lock (locker)
                if (dataQ.Count > 0)
                {
                    data = dataQ.Dequeue();
                    if (data == null)
                    {
                        sw.Close();
                        return;
                    }
                }

            // Check to see if an element was dequeued. If it was write it to file.
            if (data != null)
            {
                sw.Write(data.timeIndex.ToString() + ", ");
                foreach (UInt16 dd in data.data)
                    sw.Write(dd.ToString() + ", ");
                sw.WriteLine();
            }
            else
            {
                wh.WaitOne();
            }
        }

データをデキューしてDequeueDataTest.txtに書き込んだ後の出力結果

6、64791、19466、47772、65405、


アップデート 1:

現在のコード内のロックの場所。


ファイルへの書き込みデータをロックするようにコードを編集しました。したがって、私がロックしているコードブロックは次のとおりです。

CalculateAndEnqueueData()メソッドで私が持っている

lock (locker) dataQ.Enqueue(dat);
wh.Set

lock(locker)
{
  sw2.Write(j.ToString() + ", ");
  foreach (UInt16 dd in dat.intData)
  {
     sw2.Write(dd.ToString() + ", ");
  }
  sw2.WriteLine();
}

dequeueDataMethod()には、ロック付きの 2 つの領域があり、最初はここにあります

lock(locker) 
    if (dataQ.Count > 0)
    {
       data = dataQ.Dequeue();
       if (data == null)
       {
           sw.Close();
           return;
        }
    }

これは、 ifブロック内のコードのロック ロッカーを想定しています。2番目は、ここでファイルに書き込む場所です

lock (locker)
{
    sw.Write(data.timeIndex.ToString() + ", ");
    foreach (UInt16 dd in data.intData)
        sw.Write(dd.ToString() + ", ");
    sw.WriteLine();
    if (icnt > 10)
    {
        sw.Close();
        return;
    }
    this.label1.Text = dataQ.Count.ToString();
}

それがすべてです。


4

2 に答える 2

9

この問題は、書き込み先の StreamWriter で同期が行われていないことが原因です。順番は順不同です。

于 2009-01-26T00:52:14.883 に答える
1

同じ配列に何度も書き込んでいるためですUInt16[] intDistか?sensorDataオブジェクトごとに別々の配列を使用するべきではありませんか? (ところで、あなたのサンプルコードにsensorData.intDataあると思われますか?)sensorData.data

説明:

intDistでは 1 つの配列のみが作成されるcalculateAndEnqueueData()ため、異なるsensorDataインスタンスはすべて同じ配列を共有します --- 追加 + 書き込みおよび削除 + 書き込みがロックステップで発生する場合、これは問題ありません。そうしないと、一部のデータ ポイントが失われたり、繰り返されたりする可能性があります。

提案:

で、配列を使用せずにインスタンスsensorDataを直接設定します。intDistcalculateAndEnqueueData()

    // create new sensorData instance
    sensorData dat = new sensorData();
    dat.TimeStamp = time;
    dat.timeIndex = j;

    // Calculate four Sine curves
    for (int i = 0; i < collectedData.numberOfChannels; i++)
    {
        dat.data[i] = (UInt16) Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
    }

    // enqueue
    lock (locker) dataQ.Enqueue(dat);
于 2009-01-26T01:15:19.737 に答える