6

キューへの同時アクセスの制御を抽象化することを目的としたクラスを作成しました。

このクラスは、単一のスレッドでインスタンス化され、複数のスレッドによって書き込まれ、後続の単一のスレッドから読み取られるように設計されています。

クラス内で生成された単一の長時間実行タスクがあり、アイテムが正常にデキューされた場合にブロッキング ループを実行し、イベントを発生させます。

私の質問はこれです:私の実装は、長時間実行されているタスクのキャンセルと、その後のCancellationTokenSourceオブジェクトの正しい使用方法のクリーンアップ/リセットですか?

理想的には、キューに追加する可用性を維持しながら、アクティブなオブジェクトを停止および再起動できるようにしたいと考えています。

Peter Bromberg の記事をベースとして使用しました: Producer/Consumer Queue and BlockingCollection in C# 4.0

以下のコード:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}

更新 これが私が最後に行ったことです。完璧ではありませんが、これまでのところ仕事をしています。

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}
4

1 に答える 1

1

慎重にプログラミングすることだけが、それをカットする唯一の方法です。操作をキャンセルしても、適切な時間内に完了しない保留中の操作がある可能性があります。デッドロックされたブロッキング操作である可能性が非常に高いです。この場合、プログラムは実際には終了しません。

たとえば、CleanUp メソッドを数回呼び出したり、最初に Start を呼び出さずに呼び出したりすると、クラッシュしそうな気がします。

クリーンアップ中の 2 秒のタイムアウトは、計画よりも恣意的に感じられます。実際には、物事が適切にシャットダウンするか、クラッシュ/ハングすることを保証するところまで行きます (同時実行のものを不明な状態のままにしたくない)。

また、IsRunningオブジェクトの状態から推測されるのではなく、明示的に設定されます。

インスピレーションを得るために、私が最近書いた同様のクラスを見ていただきたいと思います。これは、バックグラウンド スレッドで機能するプロデューサー/コンシューマー パターンです。そのソース コードはCodePlexにあります。ただし、これは非常に特定の問題を解決するために設計されました。

ここでは、ワーカー スレッドのみが認識し、シャットダウンを開始する特定の型をエンキューすることで、キャンセルが解決されます。これにより、保留中の作業を決してキャンセルせず、作業単位全体のみが考慮されます。

この状況を少し改善するために、現在の作業用に別のタイマーを用意し、キャンセルされた場合は不完全な作業を中止またはロールバックすることができます。ここで、トランザクションのような動作を実装するには試行錯誤が必要です。考えられるすべてのコーナー ケースを調べて、ここでプログラムがクラッシュしたらどうなるかを自問する必要があるからです。理想的には、これらすべてのコード パスが、作業を再開できる回復可能な状態または既知の状態につながるようにします。しかし、すでにお察しのとおり、それには注意深いプログラミングと多くのテストが必要です。

于 2011-03-10T08:18:09.797 に答える