1

車輪の再発明に長い時間を費やす前に、.Net に目的の機能を実行するクラスが既に存在するかどうかを確認したいと思いました。

私が欲しいのは、セマフォに少し似たもの (またはおそらくカウントダウンイベントに似たもの) ですが、少し異なります。

さまざまな数の「リソース」を利用できるという要件があり、利用可能なリソースがゼロのときにスレッドを効率的に待機させたいと考えています。その間、別のスレッドがリソースを解放できます。これにより、待機中の他のスレッドがすぐに解放されます。

これはセマフォによく似ていますが、セマフォが(私が見る限り)各スレッドをカウントするという点で「リソース」として扱うためではありません。

とにかく、これが私が望むものの最初の簡単な実装です。破棄、コード コントラクト、エラー処理、タイムアウトのサポート、またはキャンセルのサポートはまだありませんが、私が望むものを示す必要があります。

public sealed class ResourceCounter
{
    /// <summary>Create with the specified number of resources initially available.</summary>

    public ResourceCounter(int resourceCount)
    {
        _resourceCount = resourceCount;

        if (_resourceCount > 0)
        {
            _resourceAvailable.Set();
        }
    }

    /// <summary>Acquires a resource. Waits forever if necessary.</summary>

    public void Acquire()
    {
        while (true)
        {
            _resourceAvailable.Wait();

            lock (_lock)
            {
                if (_resourceCount > 0)
                {
                    if (--_resourceCount == 0)
                    {
                        _resourceAvailable.Reset();
                    }

                    return;
                }
            }
        }
    }

    /// <summary>Releases a resource.</summary>

    public void Release()
    {
        lock (_lock)
        {
            ++_resourceCount;
            _resourceAvailable.Set();
        }
    }

    private int _resourceCount;
    private readonly object _lock = new object(); 
    private readonly ManualResetEventSlim _resourceAvailable = new ManualResetEventSlim();
}

使用パターンは非常に単純です。

  1. 必要な初期リソース カウント (ゼロ以上) で ResourceCounter を構築します。

  2. リソースを取得したいスレッドは、ResourceCounter.Acquire() を呼び出します。これは、リソースが利用可能になり、取得されるまで返されません。

  3. リソースを解放したいスレッドは ResourceCounter.Release() を呼び出します。これはリソースを解放してすぐに戻ります。

どのスレッドでもリソースを解放できることに注意してください。リソースを取得したものである必要はありません。

これをマルチスレッド パイプライン コードの一部として使用しています。1 つのスレッドが作業項目のエンキューを担当し、複数のスレッドが作業項目を処理し、別のスレッドが処理された作業項目を出力しています。処理された作業項目を出力するスレッドはそれらを多重化する必要があり (処理スレッドは完了した項目を任意の順序で出力する可能性があるため)、マルチプレクサが遅れた項目を待機している間、作業項目が際限なくキューに入れられるのを停止するメカニズムが必要でした。

(これに関する背景については、パイプライン、多重化、および無制限のバッファリングを参照してください。)

とにかく、これを行うために既に利用できるものはありますか、それとも独自のクラスを開発し続ける必要がありますか?


[編集]

以下に示すように、SemaphoreSlim は正確に正しいことを行います。Wait() を呼び出したスレッドは Release() を呼び出したスレッドでなければならないと思ったので拒否しましたが、そうではありませんでした。これは私が日曜日にコーディングするために得たものです... ;)

4

2 に答える 2

3

多段パイプラインアーキテクチャは、通信にキューを使用してより簡単に構築できます。プロデューサースレッドはアイテムをワークキューに配置し、1つ以上のワーカースレッドがアイテムをデキューして処理し、それらを出力キューに追加します。最後のスレッドは出力キューを読み取り、データを出力します。

.NETでは、これはBlockingCollectionを使用して簡単に実行できます。

2ステージパイプラインの例については、https://stackoverflow.com/a/5108487/56778を参照してください。別のステージを追加するのは簡単です。

出力スレッドが故障する問題を処理するために、最小ヒープを使用して、出力キューを優先キューにしました。私のアイテムは連続したレコード番号で識別されたので、出力スレッドは次にどのレコード番号が出力されるかを知っていました。アイテムがキューに入れられるのを待ちAutoResetEventます(アイテムがキューに入れられたときにワーカープロセスがイベントを設定します)。次に、出力スレッドは一番上のアイテムを覗いて、期待されるアイテムと一致するかどうかを確認します。そうでない場合は、イベントを再度待機します。

2番目のキューが削除されたため、これは非常にうまく機能しました。ブロックは、それが属していた出力キューにありました。パフォーマンスは私の目的にとって非常に良かった。アイテムのエンキューはO(log n)操作ですが、実際nには非常に小さいです。キューに100,000個のアイテムが含まれている場合でも、レコードの処理にかかる時間と比較して、アイテムのキューに入れるのに必要な時間はわずかでした。

あなたはまだBlockingCollectionこれに使うことができます。バイナリヒープクラスにIProducerConsumerCollectionインターフェイスを実装させる必要があります。これは、 AGenericBinaryHeapクラスで公開した単純なバイナリヒープクラスにロックを追加することで実現しました。BlockingCollection次に、次のように、それらの1つをコンストラクターに提供できます。

BlockingCollection<MyRecord> = 
    new BlockingCollection<ConcurrentBinaryHeap<MyRecord>>(
    new ConcurrentBinaryHeap<MyRecord>, MaxQueueSize);

ただし、ここには潜在的なデッドロックがあります。キューがいっぱいになると(つまり、初期化時に設定した最大値を超えるとBlockingCollection)、遅延スレッドはアイテムをエンキューできず、すべての作業が完全に停止します。レコードごとの処理時間は変化しましたが、それほど変化しなかったため、これは実際には起こりませんでした。

懸念がある場合は、キューのサイズを増やすか(キューがいっぱいになることはないと確実に言える場合にのみ機能します)、キューがいっぱいになった場合に次に投稿される予定のアイテムの代替チャネルを提供できます。 。私はそれを機能させましたが、私の目的では、キューのサイズを増やすだけの方が簡単でした。

興味があれば、アーカイブを調べてそのConcurrentBinaryHeapクラスを見つけることができます。

于 2013-02-24T15:54:40.243 に答える
1

スレッドが相互に通信する方法は、リソースやそのロックメカニズムとは関係ありません。アドホックメッセージングシステム(メッセージキュー、イベント、またはニーズに合ったもの)を使用して、同じプロセス内でセマフォとリソースを渡すことを妨げるものは何もありません。

于 2013-02-24T15:58:01.230 に答える