スレッドプールでキューに入れられたときに失敗したワーカーを追跡(カウント)し、すべてのスレッドを終了するためにWaitHandle.WaitAll()を使用する優れた方法を探しています。
カウンターのインターロックは良いテクニックですか、それとももっと強力な戦略がありますか?
スレッドプールでキューに入れられたときに失敗したワーカーを追跡(カウント)し、すべてのスレッドを終了するためにWaitHandle.WaitAll()を使用する優れた方法を探しています。
カウンターのインターロックは良いテクニックですか、それとももっと強力な戦略がありますか?
さて、これがあなたが取ることができるアプローチです。追跡するデータをクラスにカプセル化しましたTrackedWorkers
。このクラスには、作業するワーカーの数を設定できるコンストラクターがあります。次に、を使用してワーカーが起動されます。LaunchWorkers
これには、を食べてobject
を返すデリゲートが必要bool
です。object
はワーカーへの入力を表し、はそれぞれ戻り値に応じて、または戻り値である場合のbool
成功または失敗を表します。true
false
つまり、基本的に、ワーカーの状態を追跡するための配列があります。ワーカーを起動し、ワーカーからの戻り値に応じて、そのワーカーに対応するステータスを設定します。ワーカーが戻ってきたら、とを設定しAutoResetEvent
、WaitHandle.WaitAll
すべてAutoResetEvents
を設定します。
ワーカーが実行することになっている作業(デリゲート)、その作業への入力、およびそのスレッドに対応するID
ステータスを設定するために使用されるネストされたクラスがあることに注意してください。AutoResetEvent
func
作業が完了すると、作業デリゲートへの参照も。への参照も保持されないことに注意してくださいinput
。これは、誤ってガベージコレクションが行われるのを防ぐために重要です。
特定のワーカーのステータスを取得する方法と、成功したワーカーのすべてのインデックス、および失敗したワーカーのすべてのインデックスを取得する方法があります。
最後の注意:私はこのコードの作成準備ができているとは考えていません。それは私がとるアプローチの単なるスケッチです。テスト、例外処理、およびその他のそのような詳細を追加するように注意する必要があります。
class TrackedWorkers {
class WorkerState {
public object Input { get; private set; }
public int ID { get; private set; }
public Func<object, bool> Func { get; private set; }
public WorkerState(Func<object, bool> func, object input, int id) {
Func = func;
Input = input;
ID = id;
}
}
AutoResetEvent[] events;
bool[] statuses;
bool _workComplete;
int _number;
public TrackedWorkers(int number) {
if (number <= 0 || number > 64) {
throw new ArgumentOutOfRangeException(
"number",
"number must be positive and at most 64"
);
}
this._number = number;
events = new AutoResetEvent[number];
statuses = new bool[number];
_workComplete = false;
}
void Initialize() {
_workComplete = false;
for (int i = 0; i < _number; i++) {
events[i] = new AutoResetEvent(false);
statuses[i] = true;
}
}
void DoWork(object state) {
WorkerState ws = (WorkerState)state;
statuses[ws.ID] = ws.Func(ws.Input);
events[ws.ID].Set();
}
public void LaunchWorkers(Func<object, bool> func, object[] inputs) {
Initialize();
for (int i = 0; i < _number; i++) {
WorkerState ws = new WorkerState(func, inputs[i], i);
ThreadPool.QueueUserWorkItem(this.DoWork, ws);
}
WaitHandle.WaitAll(events);
_workComplete = true;
}
void ThrowIfWorkIsNotDone() {
if (!_workComplete) {
throw new InvalidOperationException("work not complete");
}
}
public bool GetWorkerStatus(int i) {
ThrowIfWorkIsNotDone();
return statuses[i];
}
public IEnumerable<int> SuccessfulWorkers {
get {
return WorkersWhere(b => b);
}
}
public IEnumerable<int> FailedWorkers {
get {
return WorkersWhere(b => !b);
}
}
IEnumerable<int> WorkersWhere(Predicate<bool> predicate) {
ThrowIfWorkIsNotDone();
for (int i = 0; i < _number; i++) {
if (predicate(statuses[i])) {
yield return i;
}
}
}
}
使用例:
class Program {
static Random rg = new Random();
static object lockObject = new object();
static void Main(string[] args) {
int count = 64;
Pair[] pairs = new Pair[count];
for(int i = 0; i < count; i++) {
pairs[i] = new Pair(i, 2 * i);
}
TrackedWorkers workers = new TrackedWorkers(count);
workers.LaunchWorkers(SleepAndAdd, pairs.Cast<object>().ToArray());
Console.WriteLine(
"Number successful: {0}",
workers.SuccessfulWorkers.Count()
);
Console.WriteLine(
"Number failed: {0}",
workers.FailedWorkers.Count()
);
}
static bool SleepAndAdd(object o) {
Pair pair = (Pair)o;
int timeout;
double d;
lock (lockObject) {
timeout = rg.Next(1000);
d = rg.NextDouble();
}
Thread.Sleep(timeout);
bool success = d < 0.5;
if (success) {
Console.WriteLine(pair.First + pair.Second);
}
return (success);
}
}
上記のプログラムは64個のスレッドを起動します。3番目のスレッドには、番号をi
追加し、その結果をコンソールに出力するタスクがあります。ただし、忙しさをシミュレートするためにランダムな量のスリープ(1秒未満)を追加し、コインを投げてスレッドの成功または失敗を判断します。成功した人は、任務を負った合計を印刷して返します。失敗したものは何も出力せずに戻ります。i
2 * i
true
false
ここで私は使用しました
struct Pair {
public int First { get; private set; }
public int Second { get; private set; }
public Pair(int first, int second) : this() {
this.First = first;
this.Second = second;
}
}