0

.NET でスレッド化されたキューを実装しようとしていますが、テストを実行すると問題が発生します。

実装では、1 つのスレッドのみがアイテムをキューに入れ、1 つのスレッドのみがアイテムを取り出すようにするため、スレッド化の複雑さの一部を回避することが許可されています (これは設計によるものです)。

問題は、Take() がアイテムが存在しなかったかのようにアイテムをスキップすることがあり、私のテストでは "Expected: 736 But was: 737" が表示されることです。このコードのどこにも、そのような効果が発生することはわかりません。Put は最後のアイテムの後にのみ配置され (したがって、this.m_Head に直接影響することはありません)、Take は Interlocked.Exchange を使用して head からアイテムを取得します。

この実装はどのように問題の発生を許可しますか?

実装:

using System;
using System.Threading;

#pragma warning disable 420

namespace Tychaia.Threading
{
    public class TaskPipeline<T>
    {
        private int? m_InputThread;
        private int? m_OutputThread;
        private volatile TaskPipelineEntry<T> m_Head;

        /// <summary>
        /// Creates a new TaskPipeline with the current thread being
        /// considered to be the input side of the pipeline.  The
        /// output thread should call Connect().
        /// </summary>
        public TaskPipeline()
        {
            this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
            this.m_OutputThread = null;
        }

        /// <summary>
        /// Connects the current thread as the output of the pipeline.
        /// </summary>
        public void Connect()
        {
            if (this.m_OutputThread != null)
                throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
            this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
        }

        /// <summary>
        /// Puts an item into the queue to be processed.
        /// </summary>
        /// <param name="value">Value.</param>
        public void Put(T value)
        {
            if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
                throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");

            // Walk the queued items until we find one that
            // has Next set to null.
            var head = this.m_Head;
            while (head != null)
            {
                if (head.Next != null)
                    head = head.Next;
                if (head.Next == null)
                    break;
            }
            if (head == null)
                this.m_Head = new TaskPipelineEntry<T> { Value = value };
            else
                head.Next = new TaskPipelineEntry<T> { Value = value };
        }

        /// <summary>
        /// Takes the next item from the pipeline, or blocks until an item
        /// is recieved.
        /// </summary>
        /// <returns>The next item.</returns>
        public T Take()
        {
            if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
                throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");

            // Wait until there is an item to take.
            var spin = new SpinWait();
            while (this.m_Head == null)
                spin.SpinOnce();

            // Return the item and exchange the current head with
            // the next item, all in an atomic operation.
            return Interlocked.Exchange(ref this.m_Head, this.m_Head.Next).Value;
        }
    }
}

#pragma warning restore 420

失敗したテスト:

[Test]
public void TestPipelineParallelTo100()
{
    var random = new Random();
    var pipeline = new TaskPipeline<int>();
    var success = true;
    int expected = 0, actual = 0;
    ThreadStart processor = () =>
    {
        pipeline.Connect();
        for (int i = 0; i < 100; i++)
        {
            var v = pipeline.Take();
            if (v != i)
            {
                success = false;
                expected = i;
                actual = v;
                break;
            }
            Thread.Sleep(random.Next(1, 10));
        }
    };
    var thread = new Thread(processor);
    thread.Start();
    for (int i = 0; i < 100; i++)
    {
        pipeline.Put(i);
        Thread.Sleep(random.Next(1, 10));
    }
    thread.Join();
    if (!success)
        Assert.AreEqual(expected, actual);
}
4

1 に答える 1

0

に渡すために をm_Head.Next読み取った後にの値を代入すると、ポインタにアクセスする唯一の方法が を経由するため、ポインタは失われます。TakeInterlocked.Exchange(ref this.m_Head, this.m_Head.Next)m_Head

  • Takem_Head.Next( ==null)を読み取ります
  • Put書き込みますm_Head.Next( !=null)
  • Take書き込みますm_Head( ==null)

編集:これはうまくいくはずです。null 以外のセンチネル値を使用し、既に削除されているエントリを再利用しようとしないInterlocked.CompareExchangeようにしました。PutTake

編集 2:に微調整しTakeます。

編集 3:goto retry;識別されたテールPutEntry.Sentinel.

using System;
using System.Threading;

#pragma warning disable 420

namespace Tychaia.Threading
{
    public class TaskPipeline<T>
    {
        private int? m_InputThread;
        private int? m_OutputThread;
        private volatile Entry m_Head;

        private sealed class Entry
        {
            public static readonly Entry Sentinel = new Entry(default(T));

            public readonly T Value;
            public Entry Next;

            public Entry(T value)
            {
                Value = value;
                Next = null;
            }
        }

        /// <summary>
        /// Creates a new TaskPipeline with the current thread being
        /// considered to be the input side of the pipeline.  The
        /// output thread should call Connect().
        /// </summary>
        public TaskPipeline()
        {
            this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
            this.m_OutputThread = null;
        }

        /// <summary>
        /// Connects the current thread as the output of the pipeline.
        /// </summary>
        public void Connect()
        {
            if (this.m_OutputThread != null)
                throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
            this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
        }

        /// <summary>
        /// Puts an item into the queue to be processed.
        /// </summary>
        /// <param name="value">Value.</param>
        public void Put(T value)
        {
            if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
                throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");

        retry:
            // Walk the queued items until we find one that
            // has Next set to null.
            var head = this.m_Head;
            while (head != null)
            {
                if (head.Next != null)
                    head = head.Next;
                if (head.Next == null)
                    break;
            }

            if (head == null)
            {
                if (Interlocked.CompareExchange(ref m_Head, new Entry(value), null) != null)
                    goto retry;
            }
            else
            {
                if (Interlocked.CompareExchange(ref head.Next, new Entry(value), null) != null)
                    goto retry;
            }
        }

        /// <summary>
        /// Takes the next item from the pipeline, or blocks until an item
        /// is recieved.
        /// </summary>
        /// <returns>The next item.</returns>
        public T Take()
        {
            if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
                throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");

            // Wait until there is an item to take.
            var spin = new SpinWait();
            while (this.m_Head == null)
                spin.SpinOnce();

            // Return the item and exchange the current head with
            // the next item, all in an atomic operation.
            Entry head = m_Head;
        retry:
            Entry next = head.Next;
            // replace m_Head.Next with a non-null sentinel to ensure Put won't try to reuse it
            if (Interlocked.CompareExchange(ref head.Next, Entry.Sentinel, next) != next)
                goto retry;

            m_Head = next;
            return head.Value;
        }
    }
}
于 2013-03-22T13:37:43.683 に答える