3

編集と更新 -私は自分のパソコンで同じコードを試してみましたが、非常にうまく動作します。この同じコードを使用して、問題なく任意のタイプのファイルをコピーできました。仕事用のコンピューターでコードを実行すると、この問題が発生します。これがどのように、そしてなぜこれがコンピューターに依存するのか理解できません。ここで何か不足しているかどうか教えてください。

readTask では、ファイルを順次読み取り、そのバイトを BlockingCollection に追加しています。そして消費タスクでは、BlockingCollection に表示されるデータを読み取り、ファイルに書き込みます。デフォルトでは、BlockingCollection は ConcurrentQueue のラッパーであるため、ブロッキング キューからの読み取りは、書き込まれた順序と同じになると予想されます。しかし、宛先ファイルをソースと比較すると、まったく異なり、重複が見られることがあります。
私のソースファイルは、以下のように、各番号が新しい行にある一連の番号です。

1
2
3
4
5
6
7
8
9
10

私のファイルには、ファイルに十分なサイズを持たせるために約5000個の数字があります。このコードに何か問題がありますか? または、コレクションのブロックが機能するはずの方法ではありません。この例ではファイルに書き込んでいますが、実際にはこのファイルを Rest API にプッシュする必要があり、データが順番に送信されることが重要です。バイトを順番に送信できない場合、ファイルはサーバーに保存されたときに破損します。

static void Main(string[] args)
    {
        BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);

        Task consumeTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenWrite(@"C:\Temp\pass_new.txt");
            foreach (byte[] data in bc.GetConsumingEnumerable())
            {
                fs.Write(data, 0, data.Length);
            }
            fs.Close();
        });

        Task readTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenRead(@"C:\Temp\pass.txt");
            var bufferSize = 4096;
            var buffer = new byte[bufferSize];
            int bytesRead = 0;
            while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
            {
                byte[] dataToWrite = buffer;
                if (bytesRead < bufferSize)
                {
                    dataToWrite = new byte[bytesRead];
                    Array.Copy(buffer, dataToWrite, bytesRead);
                }
                bc.Add(dataToWrite);
            }
            fs.Close();
        }).ContinueWith(ant => bc.CompleteAdding());

        consumeTask.Wait();

    }
4

2 に答える 2

2

のときにバッファを再利用しているためだと思いますbytesRead == bufferSize。これらの 2 つのシーケンスを想像してみてください (ここでは参照変数を参照するために「ポインター」という用語を大まかに使用していますが、その方がより適切に理解できると思います)。

まず、バッファサイズを下回っている場合:

  1. bufferサイズが 4096 のメモリ内の新しいバイト配列を指定します。
  2. fs.Readbufferオブジェクトポイントに20 バイトを書き込みます。
  3. dataToWritebuffer と同じオブジェクトを指定します。
  4. dataToWriteサイズが 20 バイトの新しいバイト配列を指定します。
  5. bufferオブジェクト ポイントからオブジェクトポイントに20 バイトをコピーdataToWriteします。
  6. dataToWriteブロッキング コレクション内のオブジェクト ポイントへのポインターを配置します。
  7. fs.Readbufferオブジェクトポイントに30 バイトを書き込みます。

これを、バッファサイズを満たす場合に何が起こるかを比較してください。

  1. bufferサイズが 4096 のメモリ内の新しいバイト配列を指定します。
  2. fs.Readbufferオブジェクトポイントに4096バイトを書き込みます。
  3. dataToWritebuffer と同じオブジェクトを指定します。
  4. dataToWriteブロッキング コレクション内のオブジェクト ポイントへのポインターを配置します。
  5. fs.Readbufferオブジェクトポイントに30 バイトを書き込みます。

dataToWritebuffer、およびブロッキング コレクションに追加したアイテムはすべて同じオブジェクトを指しているため、最後にfs.Readコレクションに格納されたばかりのバイト配列が変更されます。

if ステートメントを削除し、常に新しいものdataToWriteを割り当てれば、プログラムは正常に動作するはずです。

于 2014-03-13T13:57:27.087 に答える
-1

(.GetConsumingEnumerable() ではなく) Take() を試してみます
。GetConsumingEnumerable は引き続き機能するはず
です

ドキュメントを見る

BlockingCollection.Take メソッド

アイテムが削除される順序は、BlockingCollection インスタンスの作成に使用されたコレクションのタイプによって異なります。BlockingCollection オブジェクトを作成するときに、使用するコレクションのタイプを指定できます。たとえば、先入れ先出し (FIFO) の動作には ConcurrentConcurrentQueue オブジェクトを指定したり、後入れ先出し (LIFO) の動作には ConcurrentStack オブジェクトを指定したりできます。

重複について。私はそれを何度も使用してきましたが、BlockingCollection が重複を生成することを知りませんでした。重複が制約に違反するデータベースにフィードするために使用します。

于 2014-03-12T21:52:30.267 に答える