これはほとんどアプリケーション専用のタスクのように見えるので、特定のタスクに使用されるスレッドの数を自分で管理することは理にかなっているかもしれないことに同意します。
また、あなたのプロセスにはいくつかの段階があるようです:
- チェックする次のアドレスを提供する
- チェック時に使用するタイムアウトを決定します。使用するタイムアウトは、以前のチェックでアドレスが応答していないと判断されたかどうか、一般的な応答時間、および Dariusz が述べたように、LAN、エクストラネット、インターネット上にある場合など、いくつかの要因に依存する可能性があります.. .
- ping の実行
- ping 応答と以前の応答ステータスおよび蓄積されたステータスの処理と解釈 (たとえば、アドレスの統計の更新、場合によってはこれを保存することさえあります)。
- (繰り返される)無反応に対する「アラート」の発行
- 再起動コマンドの発行。
そのため、前のステージで生成された出力を使用して独立して明確に実行できるステージがある場合は、各ステージに多数の専用スレッドを割り当てることができる SEDA (Staged Event Driven Architecture) タイプのソリューションを選択できます。また、ステージ間を流れる特定の情報アイテムに対してプロバイダー/プロデューサー/コンシューマーの役割を使用して、ステージを相互に接続できます。一時的な不一致 (ピーク負荷) を吸収する ProducerConsumerQueues と自動スロットリング (ping の保留中の要求が多すぎるなど) があります。 ping を実行するコンシューマが十分に追いつくまで、ping リクエストのプロデューサーをブロックします)。
「Ping フロー」の基本構造については、次の段階が考えられます。
- IPAddresses の Provider によって供給され、Factory を使用してリクエストを作成する「PingRequest」プロデューサー ステージ (これにより、Factory は、IPAddress の履歴と最後の既知のステータスからリクエストのタイムアウトを判断できます)。「PingRequests」の接続されたコンシューマにリクエストを渡します。
- コンシューマー キューから PingRequests を取得する "Pinger" ステージは、Ping を実行し、接続された "PingResults" のコンシューマーに結果を渡します。
- コンシューマ キューから PingResults を取得する「ResultProcessor」ステージは、IPAddress のステータスを更新し、「PingStatus」の接続されたコンシューマに結果を渡します。
ステージ 3 の後、アラートの生成、再起動の要求などのために、同じ方法でステージを追加することができます。
これらの各ステージには専用のスレッド数が割り当てられ、フローを柔軟に変更できます。
説明するいくつかのコード例:
/// <summary>
/// Coordinates and wires up the processing pipeline.
/// </summary>
public class PingModule : IConsumer<PingStatus>
{
private readonly ConcurrentDictionary<IPAddress, PingStatus> _status = new ConcurrentDictionary<IPAddress,PingStatus>();
private readonly CancellationTokenSource _cancelTokenSource;
private readonly PingRequestProducerWorkStage _requestProducer;
private readonly PingWorkStage _pinger;
private readonly PingReplyProcessingWorkStage _replyProcessor;
public PingModule(IProvider<IPAddress> addressProvider)
{
_cancelTokenSource = new CancellationTokenSource();
_requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
_pinger = new PingWorkStage(4, 10 * 2, _cancelTokenSource.Token);
_replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
// connect the pipeline.
_requestProducer.ConnectTo(_pinger);
_pinger.ConnectTo(_replyProcessor);
_replyProcessor.ConnectTo(this);
}
private PingRequest NextRequestFor(IPAddress address)
{
PingStatus curStatus;
if (!_status.TryGetValue(address, out curStatus))
return new PingRequest(address, IPStatus.Success, TimeSpan.FromMilliseconds(120));
if (curStatus.LastResult.TimedOut)
{
var newTimeOut = TimeSpan.FromTicks(curStatus.LastResult.TimedOutAfter.Ticks * 2);
return new PingRequest(address, IPStatus.TimedOut, newTimeOut);
}
else
{
var newTimeOut = TimeSpan.FromTicks(curStatus.AverageRoundtripTime + 4 * curStatus.RoundTripStandardDeviation);
return new PingRequest(address, IPStatus.Success, newTimeOut);
}
}
// ...
}
このパイプラインは簡単に変更できるようになりました。たとえば、2 つまたは 3 つの並列「Pinger」ステージ フローが必要であると判断する場合があります。1 つが以前に切断されたアドレスにサービスを提供し、1 つが「スロー レスポンダー」にサービスを提供し、もう 1 つが残りにサービスを提供します。これは、このルーティングを行うコンシューマにステージ 1 を接続し、PingRequest を正しい「Pinger」に渡すことによって実現できます。
public class RequestRouter : IConsumer<PingRequest>
{
private readonly Func<PingRequest, IConsumer<PingRequest>> _selector;
public RequestRouter(Func<PingRequest, IConsumer<PingRequest>> selector)
{
this._selector = selector;
}
public void Consume(PingRequest work)
{
_selector(work).Consume(work);
}
public void Consume(PingRequest work, CancellationToken cancelToken)
{
_selector(work).Consume(work, cancelToken);
}
}
public class PingModule : IConsumer<PingStatus>
{
// ...
public PingModule(IProvider<IPAddress> addressProvider)
{
_cancelTokenSource = new CancellationTokenSource();
_requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
_disconnectedPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
_slowAddressesPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
_normalPinger = new PingWorkStage(3, 10 * 2, _cancelTokenSource.Token);
_requestRouter = new RequestRouter(RoutePingRequest);
_replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
// connect the pipeline
_requestProducer.ConnectTo(_requestRouter);
_disconnectedPinger.ConnectTo(_replyProcessor);
_slowAddressesPinger.ConnectTo(_replyProcessor);
_normalPinger.ConnectTo(_replyProcessor);
_replyProcessor.ConnectTo(this);
}
private IConsumer<PingRequest> RoutePingRequest(PingRequest request)
{
if (request.LastKnownStatus != IPStatus.Success)
return _disconnectedPinger;
if (request.PingTimeOut > TimeSpan.FromMilliseconds(500))
return _slowAddressesPinger;
return _normalPinger;
}
// ...
}