異なるスレッドからファイルにバッファごとにデータ バッファを書き込む必要があります。ロックを回避するために、「file_1」、「file_2」などの異なるファイルに書き込み、最後にこれらすべてのファイルを「file」にマージします。このアプローチは良いですか?より良い提案はありますか?
一部のファイルは非常に巨大で、何千ものバッファが含まれています。したがって、何千もの一時ファイルが作成され、後でマージおよびクリーニングされます。
異なるスレッドからファイルにバッファごとにデータ バッファを書き込む必要があります。ロックを回避するために、「file_1」、「file_2」などの異なるファイルに書き込み、最後にこれらすべてのファイルを「file」にマージします。このアプローチは良いですか?より良い提案はありますか?
一部のファイルは非常に巨大で、何千ものバッファが含まれています。したがって、何千もの一時ファイルが作成され、後でマージおよびクリーニングされます。
私の直感では、ファイルの管理には費用がかかり、何千ものファイルを管理するのは複雑でエラーが発生しやすいように思えます。
代わりに、専用のスレッドで書き込みを行うのはどうですか。他のスレッドは、書き込まれるのを待っているキューにメッセージを追加するだけです。同期のオーバーヘッドはいくらかありますが、ロックで行われる実際の作業は非常に小さく、メッセージへの「ポインタ」をキューにコピーするだけです。ファイルを開いて書き込むと、ミューテックスを使用するよりもコストがかかる可能性があるため、実際にはパフォーマンスが向上する可能性があります。
BlockingCollection
を使用してバッファのキューを管理し、ファイルに書き込む方法を示すサンプル アプローチ (エラー処理なし) を次に示します。
アイデアは、を作成ParallelFileWriter
し、ファイルに書き込みたいすべてのスレッドでそれを使用することです。完了したら、破棄してください (ただし、すべてのスレッドが書き込みを完了するまで破棄しないでください!)。
これは、開始するための簡単な例にすぎません。引数のチェックとエラー処理を追加する必要があります。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public sealed class ParallelFileWriter: IDisposable
{
// maxQueueSize is the maximum number of buffers you want in the queue at once.
// If this value is reached, any threads calling Write() will block until there's
// room in the queue.
public ParallelFileWriter(string filename, int maxQueueSize)
{
_stream = new FileStream(filename, FileMode.Create);
_queue = new BlockingCollection<byte[]>(maxQueueSize);
_writerTask = Task.Run(() => writerTask());
}
public void Write(byte[] data)
{
_queue.Add(data);
}
public void Dispose()
{
_queue.CompleteAdding();
_writerTask.Wait();
_stream.Close();
}
private void writerTask()
{
foreach (var data in _queue.GetConsumingEnumerable())
{
Debug.WriteLine("Queue size = {0}", _queue.Count);
_stream.Write(data, 0, data.Length);
}
}
private readonly Task _writerTask;
private readonly BlockingCollection<byte[]> _queue;
private readonly FileStream _stream;
}
class Program
{
private void run()
{
// For demo purposes, cancel after a couple of seconds.
using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
using (var cancellationSource = new CancellationTokenSource(2000))
{
const int NUM_THREADS = 8;
Action[] actions = new Action[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i)
{
int id = i;
actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
}
Parallel.Invoke(actions);
}
}
private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
{
int index = 0;
while (!cancellation.IsCancellationRequested)
{
string text = string.Format("{0}:{1}\n", id, index++);
byte[] data = Encoding.UTF8.GetBytes(text);
fileWriter.Write(data);
}
}
static void Main(string[] args)
{
new Program().run();
}
}
}