4

あなたが私に耐えてくれることを願っています。できるだけ多くの情報を提供したかったのです。主な問題は、値をポップし、それを使用して1つの大きなフラットファイルを処理し、ファイル全体が処理されるまで何度も繰り返し実行する複数のスレッドで使用される構造(スタックなど)を作成する方法です。ファイルに、2.000行のチャンクを使用して5つのスレッドで処理できる100.000のレコードがある場合、各スレッドは10のチャンクを処理します。

私の目標は、フラットファイル内のデータを移動することです(ヘッダー...サブヘッダー...詳細、詳細、詳細、...詳細、サブフッター、サブヘッダー...詳細、詳細、詳細、...詳細、サブフッター、 Subheader ... Detail、Detail、Detail、... Detail、SubFooter、Footer structure)をOLTP DBに入れ、シンプル(フルの可能性あり)から3つのテーブルにリカバリモードを設定します。1つ目はサブヘッダー行に存在するサブヘッダーの一意のキーを表し、2つ目は中間テーブルSubheaderGroupは、2000レコードのチャンクで詳細行のグループ化を表します(FKとしてサブヘッダーのID PKを持ち、3番目はサブヘッダーPKを指すFKを持つ詳細行を表す必要があります。

何万もの詳細行を持つことができ、ロード中に宛先テーブルで0に設定された特別なフィールドを使用し、ファイル処理の最後にこれを変更するトランザクションの更新を行っているため、手動のトランザクション管理を行っています。値を1にすると、ロードが終了したことを他のアプリケーションに通知できます。

このフラットファイルを、複数のスレッドで処理し、宛先テーブルのメタデータから作成されたIDataReaderを使用してSqlBulkCopyを使用してインポートできる、複数の等しい部分(同じ行数)に分割したいと思います。

SqlBulkCopyOptions.TableLockオプションを指定してSqlBulkCopyを使用するには、プロデューサー/コンシューマーパターン(以下のリンクで説明されている-pdf分析とコードサンプル)を使用したいと思います。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx このパターンにより、複数のプロデューサーを作成でき、同じ数のコンシューマーが行を消費するためにプロデューサーにサブスクライブする必要があります。

TestSqlBulkCopyプロジェクトのDataProducer.csファイルには、数千のレコードの生成をシミュレートするメソッドがあります。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

このメソッドは、新しいスレッドのコンテキストで実行されます。この新しいスレッドが元のフラットファイルの一意のチャンクのみを読み取るようにし、別のスレッドが次のチャンクを処理するようにします。次に、コンシューマーは、SqlBulkCopy ADO.NETクラスを使用して、データ(ポンプされたデータ)をSQLServerDBに移動します。

したがって、ここでの質問は、各スレッドでどのlineFromからlineToを処理するかを指示するメインプログラムに関するものであり、これはスレッドの作成中に発生するはずだと思います。2番目の解決策は、おそらくスレッドがいくつかの構造を共有し、それらに固有の何か(スレッド番号やシーケンス番号など)を使用して共有構造を検索し(おそらくスタックと値をポップする(実行中にスタックをロックする)、次のスレッドが次に、次の値を取得します。メインプログラムはフラットファイルを取得し、チャンクのサイズを決定してスタックを作成します。

それで、誰かがいくつかのコードスニペット、複数のスレッドが1つのファイルを処理し、そのファイルの一意の部分のみを取得する方法に関する疑似コードを提供できますか?

ありがとう、ラッド

4

1 に答える 1

3

私にとってうまくいったのは、キューを使用して未処理の作業を保持し、ディクショナリを使用して処理中の作業を追跡することです。

  1. ファイル名、開始行、および行数を取得し、データベースの挿入を行う更新メソッドを持つワーカー クラスを作成します。完了時にワーカーがシグナルを送るために使用するコールバック メソッドを渡します。
  2. 各チャンクに 1 つずつ、ワーカー クラスのインスタンスを含む Queue をロードします。
  3. ワーカー インスタンスをデキューし、その更新メソッドを起動して、そのスレッドの ManagedThreadId をキーとするディクショナリにワーカー インスタンスを追加するディスパッチャー スレッドを生成します。Dictionary.Count に示されているように、最大​​許容スレッド数に達するまでこれを行います。ディスパッチャーは、スレッドが終了するまで待機してから、別のスレッドを起動します。待つ方法はいくつかあります。
  4. 各スレッドが終了すると、そのコールバックはその ManagedThreadId をディクショナリから削除します。エラー (接続タイムアウトなど) が原因でスレッドが終了した場合、コールバックはワーカーをキューに再挿入できます。これは、UI を更新するのに適した場所です。
  5. UI は、アクティブなスレッド、進行状況の合計、およびチャンクあたりの時間を表示できます。ユーザーは、アクティブなスレッドの数を調整したり、処理を一時停止したり、エラーを表示したり、早期に停止したりできます。
  6. Queue と Dictionary が空になったら、完了です。

コンソール アプリとしてのデモ コード:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
于 2010-01-14T19:46:47.423 に答える