1

FileHelpers ライブラリでプロデューサー/コンシューマー パターンを使用して、複数のスレッドを使用して 1 つのファイル (巨大になる可能性があります) からデータをインポートします。各スレッドはそのファイルのチャンクをインポートすることになっており、インポートされた行の主キーとしてファイルを読み取っている FileHelperAsyncEngine インスタンスの LineNumber プロパティを使用したいと思います。FileHelperAsyncEngine には内部的に IEnumerator IEnumerable.GetEnumerator(); があります。これは engine.ReadNext() メソッドを使用して繰り返されます。内部で LineNumber プロパティを設定します (これはスレッドセーフではないようです)。

コンシューマーには、Consumer インスタンスの内部にある DataTable のコレクションを反復処理する IDataReader 実装を使用する SqlBulkLoad クラスを介してそれらを消費するコンシューマーに DataTable を提供するプロデューサーが関連付けられます。の各インスタンスには、1 つの SqlBulkCopy インスタンスが関連付けられます。

スレッドロックの問題があります。以下は、複数の Producer スレッドを作成する方法です。各スレッドのあとがきを開始します。プロデューサ インスタンスの Produce メソッドが呼び出され、入力ファイルのどのチャンクが処理されるかが決定されます。engine.LineNumber はスレッドセーフではないようで、データベースに適切な LineNumber をインポートしていません。engine.LineNumber が engine.ReadNext() と呼ばれる別のスレッドに読み込まれ、engine.LineNumber プロパティが変更されたようです。並列性が失われるため、入力ファイルのチャンクを処理することになっているループをロックしたくありません。このスレッドの問題を解決するためにコードを再編成するにはどうすればよいですか?

ありがとうラド

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }
4

3 に答える 3

2

Dictionary<>はスレッドセーフではありません。上記のコードの辞書は適切にロックされていますか、それともあなたのロック(これ)でのみ使用されていますか?

余談ですが、lock(this)パラダイムを避け、ジェネリックオブジェクトを使用してコードをロックします。特定のリソースに関係のない他のロックの問題が発生している可能性があります。この問題については、ブログで詳しく説明しています(スレッドセーフコード用のC#.Netでのスマートリソースロック)。HTH

于 2010-04-12T19:52:22.633 に答える
1

あなたは正しいです LineNumber はスレッドセーフではありません:(

コードを調査したところ、内部リーダーから行を読み取り、後で LineNumber を更新したため、スレッド セーフではないことがわかりました。

問題は、sincronization コードを内部に追加すると、処理が非常に遅くなる可能性があることです。おそらく、そのオーバーヘッドを回避するために、内部コードのスレッド セーフ バージョンを作成する必要があります。

とにかく、パフォーマンスの観点から見ると、コードの遅い部分はファイル操作であるため、複数のスレッドを使用して読み取りを高速化することはできません。ファイルを作業キューに読み取るために必要なスレッドは 1 つだけで、ファイルを読み取って各レコードを操作する複数のスレッドがある場合、必要なスレッド セーフが得られます。

乾杯

于 2010-05-03T12:16:22.570 に答える
0

問題を修正したと思います。ロックが必要だったのは Dictionary<> でした

lock (dict) { dict[lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer)); 良い手がかりをくれた OmegaMan に感謝します。

于 2010-04-13T18:34:15.587 に答える