9

これは私の最後の質問からの続きです-つまり、質問は「ファイルにdouble値を書き込む必要があるという点でスレッドセーフなプログラムを構築するための最良の方法は何ですか。streamwriterを介して値を保存する関数の場合複数のスレッドから呼び出されていますか?それを行うための最良の方法は何ですか?」

そして、MSDNで見つかったいくつかのコードを変更しました。次はどうですか?これはすべてをファイルに正しく書き込みます。

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}
4

4 に答える 4

15

ThreadQueueUserWorkItemは、スレッド化に使用できる最も低いAPIです。絶対に、最終的に他の選択肢がない限り、私はそれらを使用しませんでした。Taskはるかに高いレベルの抽象化のためにクラスを試してください。詳細については、この件に関する最近のブログ投稿を参照してください。

同期に使用できる最も低いAPIを使用して手動で作成する代わりに、適切なプロデューサー/コンシューマーキューBlockingCollection<double>として使用することもできます。

これらのホイールを正しく再発明することは驚くほど困難です。Taskこのタイプのニーズ(具体的には、)用に設計されたクラスを使用することを強くお勧めしBlockingCollectionます。これらは.NET4.0フレームワークに組み込まれており、.NET3.5のアドオンとして利用できます

于 2010-08-28T17:38:18.643 に答える
7
  • コードにはインスタンス変数としてライターがありますが、静的ロッカーを使用しています。複数のインスタンスが異なるファイルに書き込んでいる場合、それらが同じロックを共有する必要がある理由はありません
  • 関連する注意点として、(プライベートインスタンス変数として)ライターがすでにあるので、この場合は別のロッカーオブジェクトを使用する代わりに、それをロックに使用できます。これにより、作業が少し簡単になります。

「正しい答え」は、ロック/ブロック動作の観点から探しているものによって異なります。たとえば、最も簡単なことは、中間データ構造をスキップして、各スレッドが結果を「報告」してファイルに書き込むように、WriteValuesメソッドを使用することです。何かのようなもの:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

もちろん、これは、ワーカースレッドが「レポート結果」フェーズでシリアル化されることを意味します。パフォーマンス特性によっては、問題ない場合もあります(たとえば、生成に5分、書き込みに500ミリ秒)。

スペクトルの反対側では、ワーカースレッドにデータ構造への書き込みを依頼します。.NET 4を使用している場合は、自分でロックするのではなく、 ConcurrentQueueを使用することをお勧めします。

また、ワーカースレッドによって報告されるバッチよりも大きなバッチでファイルI / Oを実行したい場合があるため、特定の頻度でバックグラウンドスレッドに書き込むことを選択することもできます。スペクトルのその端は次のようになります(実際のコードでConsole.WriteLine呼び出しを削除します。これらはただそこにあるので、実際に動作していることがわかります)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
于 2010-08-28T17:39:43.043 に答える
4

つまり、StreamWriterを使用して多数のスレッドが単一のファイルにデータを書き込む必要があるということですか?簡単。StreamWriterオブジェクトをロックするだけです。

ここのコードは5つのスレッドを作成します。各スレッドは5つの「アクション」を実行し、各アクションの最後に「file」という名前のファイルに5行を書き込みます。

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

結果は、125行のファイル「ファイル」であり、すべての「アクション」が同時に実行され、各アクションの結果がファイルに同期的に書き込まれます。

于 2010-08-28T22:07:56.273 に答える
2

そこにあるコードは微妙に壊れています-特に、キューに入れられた作業項目が最初に実行された場合、終了する前にすぐに(空の)値のリストをフラッシュします。その後、ワーカーはリストに移動していっぱいになります(無視されることになります)。自動リセットイベントも何も実行しません。これは、その状態を照会したり待機したりするものがないためです。

また、スレッドごとに異なるロックを使用するため、ロックには意味がありません。ストリームライターにアクセスするときは、必ず単一の共有ロックを保持する必要があります。フラッシュコードと生成コードの間にロックは必要ありません。生成が終了した後にフラッシュが実行されることを確認する必要があります。

リストの代わりに固定サイズの配列を使用し、配列がいっぱいになったらすべてのエントリをフラッシュしますが、おそらく正しい方向に進んでいます。これにより、スレッドの存続期間が長い場合にメモリが不足する可能性が回避されます。

于 2010-08-28T15:43:56.847 に答える