4

次のコードを取得しました(マルチスレッド環境ではうまく機能しません)

public class SomeClass
{
    private readonly ConcurrentQueue<ISocketWriterJob> _writeQueue = new ConcurrentQueue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        if (_currentJob != null)
        {
            _writeQueue.Enqueue(job);
            return;
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();
            if (!_writeQueue.TryDequeue(out _currentJob))
            {
                _currentJob = null;
                return;
            }
        }

        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}

現在実行中のジョブがない場合、Sendメソッドはジョブを非同期的に呼び出す必要があります。ある場合は、ジョブをキューに入れる必要があります。

割り当て/チェックの周りにロックをかける_currentJobと、すべてが正常に機能します。しかし、それを解決するためのロックフリーの方法はありますか?

アップデート

私はソケットを使用していて、それSendAsyncが情報を送信する方法です。つまり、Send()メソッドが呼び出されたときに、書き込み/ジョブが保留されているかどうかがわかりません。

4

4 に答える 4

4

CompareExchange意図された状態遷移についての仮説での使用を検討してください。同期を制御できるようになったため、ConcurrentQueueを使用する必要はありません。

ステートマシンを使用するように更新されました(状態の割り当てのために)不要なものを削除するために
再度更新されました。Interlocked.Exchange

public class SomeClass
{
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;
    private enum State { Idle, Active, Enqueue, Dequeue };
    private State _state;

    public void Send(ISocketWriterJob job)
    {
        bool spin = true;
        while(spin)
        {
            switch(_state)
            {
            case State.Idle:
                if (Interlocked.CompareExchange(ref _state, State.Active, State.Idle) == State.Idle)
                {
                    spin = false;
                }
                // else consider new state
                break;
            case State.Active:
                if (Interlocked.CompareExchange(ref _state, State.Enqueue, State.Active) == State.Active)
                {
                    _writeQueue.Enqueue(job);
                    _state = State.Active;
                    return;
                }
                // else consider new state
                break;
            case State.Enqueue:
            case State.Dequeue:
                // spin to wait for new state
                Thread.Yield();
                break;
            }
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();

            bool spin = true;
            while(spin)
            {
                switch(_state)
                {
                case State.Active:
                    if (Interlocked.CompareExchange(ref _state, State.Dequeue, State.Active) == State.Active)
                    {
                        if (!_writeQueue.TryDequeue(out _currentJob))
                        {
                            // handle in state _currentJob = null;
                            _state = State.Idle;
                            return;
                        }
                        else
                        {
                            _state = State.Active;
                        }
                    }
                    // else consider new state
                    break;

                case State.Enqueue:
                    // spin to wait for new state
                    Thread.Yield();
                    break;

                // impossible states
                case State.Idle:
                case State.Dequeue:
                    break;
                }
            }
        }

        _logger.Debug(_writeArgs.GetHashCode() + ": writing more ");
        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}
于 2012-12-03T12:49:03.140 に答える
1

現時点では、プロデューサーとコンシューマーの分割は少しあいまいです。「ジョブをキューに入れるか、すぐに消費する」と「キューからジョブを消費するか、キューがない場合は終了する」があります。「ジョブをキューに生成する」、「キューからジョブを消費する(最初に)」、「キューからジョブを消費する(ジョブが終了したら)」の方が明確です。

ここでの秘訣は、ジョブが表示されるのを待つBlockingCollectionことができるようにを使用することです。

BlockingCollection<ISocketWriterJob> _writeQueue =
         new BlockingCollection<ISocketWriterJob>();

文字通り呼び出しているスレッドSendにジョブをキューに入れさせます。

public void Send(ISocketWriterJob job)
{
    _writeQueue.Add(job);
}

次に、ジョブを消費するだけの別のスレッドを作成します。

public void StartConsumingJobs()
{
    // Get the first job or wait for one to be queued.
    _currentJob = _writeQueue.Take();

    // Start job
}

private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
    if (_currentJob.WriteCompleted(bytesTransferred))
    {
        _currentJob.Dispose();

        // Get next job, or wait for one to be queued.
        _currentJob = _writeQueue.Take();
    }

    _currentJob.Write(_writeArgs);

   // Start/continue job as before
}
于 2012-12-03T13:15:57.477 に答える
0

ロックフリーのテクニックを使っても何かが得られるとは思いません。Monitor.Enter単純なロックを使用しても、 /Monitor.Exitが最初に回転を使用し、待機状態でより長く待機する場合にのみ、カーネルモードに移行するため、ユーザーモードを維持できます。

つまり、ロックベースの手法は、ロックフリーの手法と同じように機能します。これは、ジョブをキューに保存してキューから戻すためにのみロックできるためですが、すべての開発者が実行できる非常に明確で堅牢なコードがあります。理解する:

public class SomeClass
{
    // We don't have to use Concurrent collections
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private readonly object _syncRoot = new object();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        lock(_syncRoot)
        {
            if (_currentJob != null)
            {
                _writeQueue.Enqueue(job);
                return;
            }
            _currentJob = job;
        }

        // Use job instead of shared state
        StartJob(job);
    }

    private void StartJob(ISocketWriterJob job)
    {
       job.Write(_writeArgs);
       // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        ISocketWriterJob currentJob = null;

        // error checks etc removed for this sample.
        lock(_syncRoot)
        {
           // I suppose this operation pretty fast as well as Dispose
           if (_currentJob.WriteCompleted(bytesTransferred))
            {
               _currentJob.Dispose();
              // There is no TryDequeue method in Queue<T>
              // But we can easily add it using extension method
              if (!_writeQueue.TryDequeue(out _currentJob))
              {
                  // We don't have set _currentJob to null
                  // because we'll achieve it via out parameter
                  // _currentJob = null;
                  return;
              }
           }

           // Storing current job for further work
           currentJob = _currentJob;
        }

        StartJob(currentJob);
    }
}

ロックフリーは最適化であり、他の最適化と同様に、最初にパフォーマンスを測定して、単純なロックベースの実装に問題があることを確認し、それが真実である場合にのみ、ロックフリーなどの低レベルの手法を使用する必要があります。パフォーマンスと保守性は古典的なトレードオフであり、慎重に選択する必要があります。

于 2012-12-03T18:26:18.680 に答える
-2

volatile現在のスレッドに最新の状態を確実に取得するために、現在のジョブにマークを付けることができます。ただし、一般的にはロックが適しています。

private volatile ISocketWriterJob _currentJob;
于 2012-12-03T12:18:44.727 に答える