ロックをまったく使用しない場合はどうなりますか。
private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
Interlocked.Increment(ref nSwap);
if(i == j) return;
var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
Interlocked.Exchange(ref arr[i], vj);
Interlocked.Exchange(ref arr[j], vi);
}
private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
spinWait.Reset();
while(true) {
var vi = Interlocked.Exchange(ref arr[i], sentinel);
if(vi != sentinel) return vi;
spinWait.SpinOnce();
}
}
センチネルは、スワッピングを行うすべてのスレッド間で共有され、スワッピングの位置を「予約」するために使用される単なるダミー オブジェクトです。
var sentinel = new object();
私のラップトップ(i7)での実行結果:
Run 0 took 272ms (nSwap=799984, nConflict=300)
Run 1 took 212ms (nSwap=799984, nConflict=706)
Run 2 took 237ms (nSwap=799984, nConflict=211)
Run 3 took 206ms (nSwap=799984, nConflict=633)
Run 4 took 228ms (nSwap=799984, nConflict=350)
nConflict は、スワップがポジションの確保に失敗した回数です。スワップの総数に比べてかなり少ないので、コンフリクトが発生した場合にのみ SpinUntil を呼び出すだけで、コンフリクトがない場合にルーチンを最適化しました。
私がテストしたコード全体:
[TestClass]
public class ParallelShuffle
{
private int nSwap = 0;
private int nConflict = 0;
[TestMethod]
public void Test()
{
const int size = 100000;
const int thCount = 8;
var sentinel = new object();
var array = new object[size];
for(int i = 0; i < array.Length; i++)
array[i] = i;
for(var nRun = 0; nRun < 10; ++nRun) {
nConflict = 0;
nSwap = 0;
var sw = Stopwatch.StartNew();
var tasks = new Task[thCount];
for(int i = 0; i < thCount; ++i) {
tasks[i] = Task.Factory.StartNew(() => {
var rand = new Random();
var spinWait = new SpinWait();
for(var count = array.Length - 1; count > 1; count--) {
var y = rand.Next(count);
OptimisticalSwap(array, count, y, sentinel, spinWait);
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
Task.WaitAll(tasks);
//Console.WriteLine(String.Join(", ", array));
Console.WriteLine("Run {3} took {0}ms (nSwap={1}, nConflict={2})", sw.ElapsedMilliseconds, nSwap, nConflict, nRun);
// check for doubles:
var checkArray = new bool[size];
for(var i = 0; i < array.Length; ++i) {
var value = (int) array[i];
Assert.IsFalse(checkArray[value], "A double! (at {0} = {1})", i, value);
checkArray[value] = true;
}
}
}
private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
Interlocked.Increment(ref nSwap);
if(i == j) return;
var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
Interlocked.Exchange(ref arr[i], vj);
Interlocked.Exchange(ref arr[j], vi);
}
private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
spinWait.Reset();
while(true) {
var vi = Interlocked.Exchange(ref arr[i], sentinel);
if(vi != sentinel) return vi;
spinWait.SpinOnce();
}
}