47

コレクションからの取得がブロックされず、代わりに非同期であり、アイテムがキューに配置されるまで async await が発生するBlockingCollectionに似た、 ConcurrentQueueの実装/ラッパーが存在するかどうか疑問に思っています。

独自の実装を考え出しましたが、期待どおりに機能していないようです。すでに存在するものを再発明しているのかどうか疑問に思っています。

これが私の実装です:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
4

10 に答える 10

5

私の試み(「約束」が作成されたときに発生するイベントがあり、外部プロデューサーがそれを使用して、いつさらにアイテムを生成するかを知ることができます):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
于 2014-04-08T13:19:30.160 に答える
1

ユースケース(学習曲線を考えると)にはやり過ぎかもしれませんが、Reactive Extentionsは、非同期合成に必要なすべての接着剤を提供します。

基本的に変更をサブスクライブし、変更が利用可能になると変更がプッシュされ、システムに変更を別のスレッドにプッシュさせることができます。

于 2011-10-23T00:48:09.483 に答える
0

これが私が現在使用している実装です。

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

それは十分に機能しますが、待機中のタスクのいくつかをキャンセルするqueueSyncLockために をかなり多く使用しているため、 にはかなりの競合があります。CancellationTokenもちろん、これにより、ブロッキングがかなり少なくなりますBlockingCollectionが...

同じ目的を達成するための、よりスムーズでロックのない手段があるかどうか疑問に思っています

于 2011-10-23T13:59:50.763 に答える
-5

BlockingCollection( default を使用して) aを使用し、呼び出しをaConcurrentQueueにラップして、次のことができるようにすることができます。TakeTaskawait

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
于 2011-10-23T09:56:22.877 に答える