7

処理するレコードを含むデータベーステーブルがあります。このテーブルには、次のステータス値を表すフラグ列があります。1-処理の準備ができている、2-正常に処理されている、3-処理に失敗している。

.netコード(繰り返しプロセス-コンソール/サービス)は、処理の準備ができているレコードのリストを取得し、それらをループして処理を試み(それほど長くはありません)、成功または失敗に基づいてステータスを更新します。

パフォーマンスを向上させるために、このプロセスでマルチスレッドを有効にします。私は、6つのスレッドを生成することを考えています。各スレッドは、サブセットを取得します。

明らかに、異なるスレッドが同じレコードを処理することは避けたいと思います。スレッドがクラッシュしてレコードがハングした場合に対処するために、データベースに「処理中」フラグを設定したくありません。

これを行う唯一の方法は、使用可能なレコードの完全なリストを取得し、各スレッドにグループ(おそらくID)を割り当てることです。個々のスレッドに障害が発生した場合、その未処理のレコードは、次にプロセスが実行されたときに取得されます。

グループをスレッドに割り当てる前にグループを分割する他の方法はありますか?

4

3 に答える 3

6

この要件を実装する最も簡単な方法は、タスク並列ライブラリを使用することです。

Parallel.ForEach(またはParallel.For)。

個々のワーカースレッドを管理できるようにします。

経験から、私は以下をお勧めします:

  • 追加のステータス「処理中」があります
  • データベースに、レコードが処理のためにピックアップされた時期を示す列と、「処理中」の時間が長すぎるレコードを定期的に検索するクリーンアップタスク/プロセスを用意します(ステータスを「処理準備完了」にリセットします)。
  • 望まない場合でも、クラッシュリカバリシナリオには「処理中」が不可欠です(同じレコードが2回処理されることを許容できない場合)。

あるいは

トランザクションキューの使用を検討してください(MSMQまたはRabbit MQが思い浮かびます)。これらは、まさにこの問題に最適化されています。

両方を大規模に行ったので、それは私の明確な選択です。

最適化

データベースからデータを取得するのに取るに足らない時間がかかる場合は、 BlockingCollectionを使用して実装するのが非常に簡単なプロデューサー/コンシューマーパターンを検討できます。このパターンにより、1つのスレッド(プロデューサー)が処理対象のDBレコードをキューに追加し、他の複数のスレッド(コンシューマー)がそのキューからのアイテムを処理できるようになります。

新しい選択肢

いくつかの処理ステップが完了したと見なされる前にレコードにアクセスすることを考えると、可能な代替手段としてWindowsWorkflowFoundationを検討してください。

于 2012-06-06T19:16:47.423 に答える
2

あなたが説明したようなことをしたことを覚えています...スレッドは、データベースに処理する必要のある新しいものがあるかどうかを時々チェックします。新しいIDのみがロードされるため、時間xで最後に読み取られたIDが1000の場合、x+1でID1001から読み取られます。

読み取ったものはすべてスレッドセーフなキューに入ります。アイテムがこのキューに追加されると、作業スレッドに通知します(おそらく、自動リセットイベントを使用するか、ここでスレッドを生成します)。各スレッドは、キューが空になるまで、このスレッドセーフキューから一度に1つのアイテムを読み取ります。

作業の前にforeachスレッドを割り当てないでください(foreachファイルのプロセスに同じ時間がかかることがわかっている場合を除く)。スレッドが作業を終了すると、残っている他のスレッドから負荷がかかるはずです。このスレッドセーフキューを使用して、これを確認します。

于 2012-06-06T19:20:08.777 に答える
0

これは、追加のデータベース列に依存/使用しない(ただし、#4を参照)か、処理中のキューを義務付ける1つのアプローチです。このアプローチの前提は、分散キャッシュのように、一貫した値に基づいてワーカー間でレコードを「シャーディング」することです。

これが私の仮定です:

  1. 再処理によって望ましくない副作用が発生することはありません。せいぜいいくつかの仕事は「無駄にされる」。
  2. スレッド数は起動時に固定されています。これは必須ではありませんが、実装が簡素化され、以下の簡単な説明で一時的な詳細をスキップできます。
  3. 「ワーカースレッド」を制御する「ワーカープロセス」は1つだけです(ただし、#1を参照)。これにより、レコードがワーカー間でどのように分割されるかを簡単に処理できます。
  4. 「よく分散された」[不変]「ID」列がいくつかあります。これは、検索ワーカーがほぼ同じ量の作業を取得するために必要です。
  5. 「最終的に行われる」限り、作業は「順不同」で行うことができます。また、各ワーカーが異なるキューで効果的に作業しているため、ワーカーが常に「100%」で実行されるとは限りません。

各スレッドにbucketから一意の値を割り当てます[0, thread_count)。スレッドが停止/再開された場合、それは空にしたものと同じバケットを取ります。

次に、スレッドが新しいレコードを必要とするたびに、データベースからフェッチします。

SELECT *
FROM record
WHERE state = 'unprocessed'
AND (id % $thread_count) = $bucket
ORDER BY date

もちろん、「このスレッドのタスク」をバッチで読み取り、それらをローカルに保存することについて、他の仮定が行われる可能性があります。ただし、ローカルキューはスレッドごとに作成されるため(したがって、新しいスレッドの起動時に再ロードされます)、特定のに関連付けられたレコードのみを処理しますbucket

スレッドの処理が終了したら、レコードは、適切な分離レベルや楽観的同時実行性を使用して、レコードを処理済みとしてマークし、次のレコードに進む必要があります。

于 2012-06-06T19:46:11.650 に答える