16

を使用しBlockingCollectionてプロデューサー/コンシューマー パターンを実装しています。コレクションを処理するデータで満たす非同期ループがあり、後でクライアントがアクセスできます。パケットがまばらに到着するため、ブロッキング呼び出しを使用せずにポーリングを実行したいと考えています。

要するに、コールバックで内部スレッド プールを利用できるように、ブロッキング コレクションに存在しないBeginTakeandのようなものを探しています。EndTake決して である必要はありませんBlockingCollection。私が必要とするものは何でも素晴らしいでしょう。

これは私が今持っているものです。_bufferedPacketsBlockingCollection<byte[]>:

public byte[] Read(int timeout)
{
    byte[] result;
    if (_bufferedPackets.IsCompleted)
    {
        throw new Exception("Out of packets");
    }
    _bufferedPackets.TryTake(out result, timeout);      
    return result;
}

これを疑似コードで次のようにしたいと思います。

public void Read(int timeout)
{
    _bufferedPackets.BeginTake(result =>
        {
            var bytes = _bufferedPackets.EndTake(result);
            // Process the bytes, or the resuting timeout
        }, timeout, _bufferedPackets);
}

これにはどのようなオプションがありますか? スレッドを待機状態にしたくはありません処理する IO が他にもたくさんあり、すぐにスレッドが不足してしまうからです。

更新:問題のコードを書き直して、非同期プロセスを別の方法で使用し、タイムアウト制限内に待機中のリクエストがあるかどうかに基づいてコールバックを本質的に交換しました。これは問題なく動作しますが、競合状態を引き起こす可能性があり、書く (そして理解する) のが難しい、タイマーに頼ったりラムダを交換したりすることなくこれを行う方法があれば素晴らしいでしょう。私は非同期キューの独自の実装でもこれを解決しましたが、より標準的で十分にテストされたオプションがあれば、それでも素晴らしいでしょう。

4

3 に答える 3

0

私はBlockingCollection<T>これを行うことができないと確信しています。自分でロールバックする必要があります。私はこれを思いついた:

class NotifyingCollection<T>
{
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>();
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>();

    private object _lock = new object();

    public void Add(T item)
    {
        _overflow.Enqueue(item);
        Dispatch();
    }

    private void Dispatch()
    {
        // this lock is needed since we need to atomically dequeue from both queues...
        lock (_lock)
        {
            while (_overflow.Count > 0 && _subscribers.Count > 0)
            {
                Action<T> callback;
                T item;

                var r1 = _overflow.TryDequeue(out item);
                var r2 = _subscribers.TryDequeue(out callback);

                Debug.Assert(r1 && r2);
                callback(item);
                // or, optionally so that the caller thread's doesn't take too long ...
                Task.Factory.StartNew(() => callback(item));
                // but you'll have to consider how exceptions will be handled.
            }
        }
    }

    public void TakeAsync(Action<T> callback)
    {
        _subscribers.Enqueue(callback);
        Dispatch();
    }
}

TakeAsync()orを呼び出すスレッドをAdd()コールバック スレッドとして使用しました。Add()またはを呼び出すとTakeAsync()、キューに入れられたすべてのアイテムをキューに入れられたコールバックにディスパッチしようとします。このようにして、ただそこに座って、シグナルを待っているだけのスレッドが作成されることはありません。

そのロックはちょっと醜いですが、ロックせずに複数のスレッドでエンキューしてサブスクライブすることができます。そのロックを使用せずに、他のキューで利用可能なものがある場合に、1 つだけをデキューするのと同等の方法を見つけることができませんでした。

注: いくつかのスレッドを使用して、これを最小限にテストしただけです。

于 2012-08-29T19:28:29.813 に答える
0

あなたの状況を誤解しているかもしれませんが、ノンブロッキング コレクションを使用することはできませんか?

説明するためにこの例を作成しました。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTakeFromBlockingCollection
{
    class Program
    {
        static void Main(string[] args)
        {
            var queue = new ConcurrentQueue<string>();

            var producer1 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("=======");
                    Thread.Sleep(10);
                }
            });

            var producer2 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("*******");
                    Thread.Sleep(3);
                }
            });

            CreateConsumerTask("One  ", 3, queue);
            CreateConsumerTask("Two  ", 4, queue);
            CreateConsumerTask("Three", 7, queue);

            producer1.Wait();
            producer2.Wait();
            Console.WriteLine("  Producers Finished");
            Console.ReadLine();
        }

        static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
        {
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string result;
                    if (queue.TryDequeue(out result))
                    {
                        Console.WriteLine("  {0} consumed {1}", taskName, result);
                    }
                    Thread.Sleep(sleepTime);
                }
            });
        }
    }
}

これがプログラムの出力です

ここに画像の説明を入力

BlockingCollection は、並行コレクションをラップし、複数のコンシューマーがブロックできるメカニズムを提供することを目的としていると思います。プロデューサー待ち。この使用法は、要件に反しているようです。

BlockingCollection クラスに関するこの記事が役立つことがわかりました。

于 2012-08-24T06:17:00.930 に答える
0

だから、これには組み込みのオプションがないように見えます. これを古い非同期パターンの他のユーザーと大まかに機能させるには、多くの手間がかかることがわかりました。

public class AsyncQueue<T>
{
    private readonly ConcurrentQueue<T> queue;
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult
    {
        public bool IsCompleted { get; set; }
        public WaitHandle AsyncWaitHandle { get; set; }
        public object AsyncState { get; set; }
        public bool CompletedSynchronously { get; set; }
        public T Result { get; set; }

        public AsyncCallback Callback { get; set; }
    }

    public AsyncQueue()
    {
        dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
        queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        DequeueAsyncResult asyncResult;
        while  (dequeueQueue.TryDequeue(out asyncResult))
        {
            if (!asyncResult.IsCompleted)
            {
                asyncResult.IsCompleted = true;
                asyncResult.Result = item;

                ThreadPool.QueueUserWorkItem(state =>
                {
                    if (asyncResult.Callback != null)
                    {
                        asyncResult.Callback(asyncResult);
                    }
                    else
                    {
                        ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
                    }
                });
                return;
            }
        }
        queue.Enqueue(item);
    }

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
    {
        T result;
        if (queue.TryDequeue(out result))
        {
            var dequeueAsyncResult = new DequeueAsyncResult
            {
                IsCompleted = true, 
                AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
                AsyncState = state, 
                CompletedSynchronously = true, 
                Result = result
            };
            if (null != callback)
            {
                callback(dequeueAsyncResult);
            }
            return dequeueAsyncResult;
        }

        var pendingResult = new DequeueAsyncResult
        {
            AsyncState = state, 
            IsCompleted = false, 
            AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
            CompletedSynchronously = false,
            Callback = callback
        };
        dequeueQueue.Enqueue(pendingResult);
        Timer t = null;
        t = new Timer(_ =>
        {
            if (!pendingResult.IsCompleted)
            {
                pendingResult.IsCompleted = true;
                if (null != callback)
                {
                    callback(pendingResult);
                }
                else
                {
                    ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
                }
            }
            t.Dispose();
        }, new object(), timeout, Timeout.Infinite);

        return pendingResult;
    }

    public T EndDequeue(IAsyncResult result)
    {
        var dequeueResult = (DequeueAsyncResult) result;
        return dequeueResult.Result;
    }
}

IsCompleteプロパティの同期についてはよくわかりません。またdequeueQueue、後続のEnqueue呼び出しでのみがどのようにクリーンアップされるかについても、あまり熱心ではありません。待機ハンドルを通知する正しいタイミングもわかりませんが、これはこれまでのところ最善の解決策です。

この製品品質のコードは決して考慮しないでください。ロックを待たずにすべてのスレッドを回転させ続ける方法の一般的な要点を示したかっただけです。これはあらゆる種類のエッジケースとバグでいっぱいだと確信していますが、要件を満たしているので、質問に出くわした人々に何かを返したいと思いました.

于 2012-08-24T19:32:50.007 に答える