FileHelpers ライブラリでプロデューサー/コンシューマー パターンを使用して、複数のスレッドを使用して 1 つのファイル (巨大になる可能性があります) からデータをインポートします。各スレッドはそのファイルのチャンクをインポートすることになっており、インポートされた行の主キーとしてファイルを読み取っている FileHelperAsyncEngine インスタンスの LineNumber プロパティを使用したいと思います。FileHelperAsyncEngine には内部的に IEnumerator IEnumerable.GetEnumerator(); があります。これは engine.ReadNext() メソッドを使用して繰り返されます。内部で LineNumber プロパティを設定します (これはスレッドセーフではないようです)。
コンシューマーには、Consumer インスタンスの内部にある DataTable のコレクションを反復処理する IDataReader 実装を使用する SqlBulkLoad クラスを介してそれらを消費するコンシューマーに DataTable を提供するプロデューサーが関連付けられます。の各インスタンスには、1 つの SqlBulkCopy インスタンスが関連付けられます。
スレッドロックの問題があります。以下は、複数の Producer スレッドを作成する方法です。各スレッドのあとがきを開始します。プロデューサ インスタンスの Produce メソッドが呼び出され、入力ファイルのどのチャンクが処理されるかが決定されます。engine.LineNumber はスレッドセーフではないようで、データベースに適切な LineNumber をインポートしていません。engine.LineNumber が engine.ReadNext() と呼ばれる別のスレッドに読み込まれ、engine.LineNumber プロパティが変更されたようです。並列性が失われるため、入力ファイルのチャンクを処理することになっているループをロックしたくありません。このスレッドの問題を解決するためにコードを再編成するにはどうすればよいですか?
ありがとうラド
for (int i = 0; i < numberOfProducerThreads; i++)
DataConsumer consumer = dataConsumers[i];
//create a new producer
DataProducer producer = new DataProducer();
//consumer has already being created
consumer.Subscribe(producer);
FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;
int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;
Thread newThread = new Thread(() =>
{
producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
consumer.SetEndOfData(producer);
});
producerThreads.Add(newThread); thread.Start();}
public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
{
lock (this)
{
engine.Options.IgnoreFirstLines = skipLines;
engine.BeginReadFile(inputFilePath);
}
int rowCount = 1;
DataTable buffer = consumer.BufferDataTable;
while (engine.ReadNext() != null)
{
lock (this)
{
dict[lineNumberFieldName] = engine.LineNumber;
buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
if (rowCount % DataBuffer.MaxBufferRowCount == 0)
{
consumer.AddBufferDataTable(buffer);
buffer = consumer.BufferDataTable;
}
if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
{
break;
}
rowCount++;
}
}
if (buffer.Rows.Count > 0)
{
consumer.AddBufferDataTable(buffer);
}
engine.Close();
}