.NET でスレッド化されたキューを実装しようとしていますが、テストを実行すると問題が発生します。
実装では、1 つのスレッドのみがアイテムをキューに入れ、1 つのスレッドのみがアイテムを取り出すようにするため、スレッド化の複雑さの一部を回避することが許可されています (これは設計によるものです)。
問題は、Take() がアイテムが存在しなかったかのようにアイテムをスキップすることがあり、私のテストでは "Expected: 736 But was: 737" が表示されることです。このコードのどこにも、そのような効果が発生することはわかりません。Put は最後のアイテムの後にのみ配置され (したがって、this.m_Head に直接影響することはありません)、Take は Interlocked.Exchange を使用して head からアイテムを取得します。
この実装はどのように問題の発生を許可しますか?
実装:
using System;
using System.Threading;
#pragma warning disable 420
namespace Tychaia.Threading
{
public class TaskPipeline<T>
{
private int? m_InputThread;
private int? m_OutputThread;
private volatile TaskPipelineEntry<T> m_Head;
/// <summary>
/// Creates a new TaskPipeline with the current thread being
/// considered to be the input side of the pipeline. The
/// output thread should call Connect().
/// </summary>
public TaskPipeline()
{
this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
this.m_OutputThread = null;
}
/// <summary>
/// Connects the current thread as the output of the pipeline.
/// </summary>
public void Connect()
{
if (this.m_OutputThread != null)
throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
}
/// <summary>
/// Puts an item into the queue to be processed.
/// </summary>
/// <param name="value">Value.</param>
public void Put(T value)
{
if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");
// Walk the queued items until we find one that
// has Next set to null.
var head = this.m_Head;
while (head != null)
{
if (head.Next != null)
head = head.Next;
if (head.Next == null)
break;
}
if (head == null)
this.m_Head = new TaskPipelineEntry<T> { Value = value };
else
head.Next = new TaskPipelineEntry<T> { Value = value };
}
/// <summary>
/// Takes the next item from the pipeline, or blocks until an item
/// is recieved.
/// </summary>
/// <returns>The next item.</returns>
public T Take()
{
if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");
// Wait until there is an item to take.
var spin = new SpinWait();
while (this.m_Head == null)
spin.SpinOnce();
// Return the item and exchange the current head with
// the next item, all in an atomic operation.
return Interlocked.Exchange(ref this.m_Head, this.m_Head.Next).Value;
}
}
}
#pragma warning restore 420
失敗したテスト:
[Test]
public void TestPipelineParallelTo100()
{
var random = new Random();
var pipeline = new TaskPipeline<int>();
var success = true;
int expected = 0, actual = 0;
ThreadStart processor = () =>
{
pipeline.Connect();
for (int i = 0; i < 100; i++)
{
var v = pipeline.Take();
if (v != i)
{
success = false;
expected = i;
actual = v;
break;
}
Thread.Sleep(random.Next(1, 10));
}
};
var thread = new Thread(processor);
thread.Start();
for (int i = 0; i < 100; i++)
{
pipeline.Put(i);
Thread.Sleep(random.Next(1, 10));
}
thread.Join();
if (!success)
Assert.AreEqual(expected, actual);
}