0

ランダムなインデックス要素を並行して交換して、配列をシャッフルする必要があります。私の質問は、現在別のスレッドによってスワップされている要素を他のスレッドが読み書きできないようにする方法です。1 つのスレッドがスワップしている間、配列全体をロックしたくありません。

カップル スレッドが異なるペア o 要素を同時に交換できるようにしたいと考えています。

私はこのようなことを試しました:

        object[] lockArray = new object[array.Length];
        for (int i = 0; i < array.Length; i++)
            lockArray[i] = new object();

        for (int i = 0; i < thredCount; i++)
        {
            Thread t = new Thread(th => Shuffle.Shuflle(array,lockArray));
            t.Start();
        }

        public class Shuffle
        {
            public static void Shuflle(char[] array,object [] lockArray)
            {
                    for (int count = array.Length - 1; count > 1; count--)
                    {
                        Random rand = new Random();
                        int y = rand.Next(count) + 1;

                        lock (lockArray[count])
                        {
                            lock (lockArray[y])
                            {
                                char temp = array[count];
                                array[count] = array[y];
                                array[y] = temp;
                            }
                        }


                    }
            }
        }

配列には、0 から 9 までの文字としての数字があり、結果は並べ替えられた数字です。しかし、時々、1つのダブルexで結果が得られます。138952469. シャッフル配列で 9 が 2 倍になり、7 が欠落しています。

問題の診断を手伝ってください。

4

3 に答える 3

4

ロックをまったく使用しない場合はどうなりますか。

private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
{
  Interlocked.Increment(ref nSwap);
  if(i == j) return;
  var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
  var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
  Interlocked.Exchange(ref arr[i], vj);
  Interlocked.Exchange(ref arr[j], vi);
}

private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
{
  spinWait.Reset();
  while(true) {
    var vi = Interlocked.Exchange(ref arr[i], sentinel);
    if(vi != sentinel) return vi;
    spinWait.SpinOnce();
  }
}

センチネルは、スワッピングを行うすべてのスレッド間で共有され、スワッピングの位置を「予約」するために使用される単なるダミー オブジェクトです。

var sentinel = new object();

私のラップトップ(i7)での実行結果:

Run 0 took 272ms (nSwap=799984, nConflict=300)
Run 1 took 212ms (nSwap=799984, nConflict=706)
Run 2 took 237ms (nSwap=799984, nConflict=211)
Run 3 took 206ms (nSwap=799984, nConflict=633)
Run 4 took 228ms (nSwap=799984, nConflict=350)

nConflict は、スワップがポジションの確保に失敗した回数です。スワップの総数に比べてかなり少ないので、コンフリクトが発生した場合にのみ SpinUntil を呼び出すだけで、コンフリクトがない場合にルーチンを最適化しました。

私がテストしたコード全体:

[TestClass]
  public class ParallelShuffle
  {
    private int nSwap = 0;
    private int nConflict = 0;
    [TestMethod]
    public void Test()
    {
      const int size = 100000;
      const int thCount = 8;
      var sentinel = new object();
      var array = new object[size];

      for(int i = 0; i < array.Length; i++)
        array[i] = i;

      for(var nRun = 0; nRun < 10; ++nRun) {
        nConflict = 0;
        nSwap = 0;
        var sw = Stopwatch.StartNew();
        var tasks = new Task[thCount];
        for(int i = 0; i < thCount; ++i) {
          tasks[i] = Task.Factory.StartNew(() => {
            var rand = new Random();
            var spinWait = new SpinWait();
            for(var count = array.Length - 1; count > 1; count--) {
              var y = rand.Next(count);
              OptimisticalSwap(array, count, y, sentinel, spinWait);
            }
          }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        Task.WaitAll(tasks);

        //Console.WriteLine(String.Join(", ", array));
        Console.WriteLine("Run {3} took {0}ms (nSwap={1}, nConflict={2})", sw.ElapsedMilliseconds, nSwap, nConflict, nRun);
        // check for doubles:
        var checkArray = new bool[size];
        for(var i = 0; i < array.Length; ++i) {
          var value = (int) array[i];
          Assert.IsFalse(checkArray[value], "A double! (at {0} = {1})", i, value);
          checkArray[value] = true;
        }
      }
    }


   private void OptimisticalSwap(object[] arr, int i, int j, object sentinel, SpinWait spinWait)
    {
      Interlocked.Increment(ref nSwap);
      if(i == j) return;
      var vi = ExchangeWithSentinel(arr, i, sentinel, spinWait);
      var vj = ExchangeWithSentinel(arr, j, sentinel, spinWait);
      Interlocked.Exchange(ref arr[i], vj);
      Interlocked.Exchange(ref arr[j], vi);
    }

    private object ExchangeWithSentinel(object[] arr, int i, object sentinel, SpinWait spinWait)
    {
      spinWait.Reset();
      while(true) {
        var vi = Interlocked.Exchange(ref arr[i], sentinel);
        if(vi != sentinel) return vi;
        spinWait.SpinOnce();
      }
    }
于 2012-08-30T08:39:48.387 に答える
3

好奇心から、なぜ複数のスレッドが並行してスワップできるようにしたいのでしょうか? スワップにはそれほど時間がかからないはずなので、スワップ中に配列全体をロックすると、個々の要素をロックしようとするよりもはるかに高速になる可能性があります。

そうは言っても、本当に並列シャッフルを行いたい場合は、次のようにしたほうがよいかもしれません。

  1. 配列の各半分を別々のスレッドでシャッフルします (競合することはありません)。
  2. 配列に対して「完全なシャッフル」を実行します (つまり、両側の要素をインターリーブします。詳細については、Faro Shuffleを参照してください)。
  3. 配列の各半分をもう一度シャッフルします。

4 つのスレッドに相当するシャッフルを実行する場合は、これを 4 つの部分に一般化できます。とはいえ、これを行うことでパフォーマンス上の利点が得られるようにするには、「スワップ」は非常に遅い操作である必要があります。

于 2012-08-29T22:19:16.577 に答える
0

簡単なアプローチは、Fisher-Yates アルゴリズムを使用してマッピングのリストを作成し、マッピングを使用して一括転置を実行している間、ソース配列のすべての操作をロックすることです。ただし、これは追加のリソースを消費し、全体的に時間がかかり、アレイの揮発時間をわずかに減らすだけです。このような新しい結果を作成することもできます。

public static IEnumerable<T> Shuffle<T>(
    this IEnumerable<T> source,
    Random random = null)
{
    random = random ?? new Random();
    var list = source.ToList();

    for (int i = list.Length; i > 1; i--)
    {
        // Pick random element to swap.
        int j = random.Next(i); // 0 <= j <= i-1;

        // Swap.
        T tmp = list[j];
        list[j] = list[i - 1];
        list[i - 1] = tmp;
    }

    return list;
}

そして、やります

var shuffler = new Task<char[]>(() => return array.Shuffle().ToArray());
array = shuffler.Result;

複数のスレッドでシャッフルしたい場合は、別のアルゴリズムと大きなソースが必要になります。

于 2012-08-30T10:02:20.550 に答える