1

C# で分散型の深さ優先検索を実装しようとしています。ある時点までは成功していましたが、同期エラーが発生しました。エラーを修正できません。私がやろうとしているのは、タスク並列データフローを使用して各ノードを互いに通信させ、それによって DFS で並列処理を実現することです。以下は私のコードです:

public class DFS
{
static List<string> traversedList = new List<string>();

static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

public static void Main(string[] args)
{

    int N = 100;
    int M = N * 4;
    int P = N * 16;

    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();

    List<string> global_list = new List<string>();

    StreamReader file = new StreamReader(args[args.Length - 2]);


    string text = file.ReadToEnd();

    string[] lines = text.Split('\n');



    string[][] array1 = new string[lines.Length][];

    for (int i = 0; i < lines.Length; i++)
    {
        lines[i] = lines[i].Trim();
        string[] words = lines[i].Split(' ');

        array1[i] = new string[words.Length];

        for (int j = 0; j < words.Length; j++)
        {
            array1[i][j] = words[j];
        }
    }

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

    for (int i = 0; i < array1.Length; i++)
    {
        for (int j = 0; j < array1[i].Length; j++)
        {
            if (j != 0)
            {
                sr.Write(array1[i][0] + ":" + array1[i][j]);
                Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                sr.Write(sr.NewLine);
            }
        }

    }
    int start_no = Convert.ToInt32(args[args.Length - 1]);
    thread_array = new Thread[lines.Length];
    string first_message = "root";
    buffer1.Post(first_message);
    buffer1.Post(array1);
    buffer1.Post(start_no);
    buffer1.Post(1);

    for (int t = 1; t < lines.Length; t++)
    {
        Console.WriteLine("thread" + t);
        thread_array[t] = new Thread(new ThreadStart(thread_run));
        thread_array[t].Name = t.ToString();
        lock (thread_array[t])
        {
            Console.WriteLine("working");
            thread_array[t].Start();
            thread_array[t].Join();
        }

    }
    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed);
    Console.ReadLine();
}

private static void dfs(string[][] array, int point)
{
    for (int z = 1; z < array[point].Length; z++)
    {
        if ((!traversedList.Contains(array[point][z])))
        {
            traversedList.Add(array[point][z]);
            parentList.Add(point.ToString());
            dfs(array, int.Parse(array[point][z]));
        }

    }
    return;


}
public static void thread_run()
{
    try
    {
        string parent;
        string[][] array1;
        int point;
        int id;
        parent = (string)buffer1.Receive();
        array1 = (string[][])buffer1.Receive();
        point = (int)buffer1.Receive();
        id = (int)buffer1.Receive();
        object value;
        Console.WriteLine("times");

        if (Thread.CurrentThread.Name.Equals(point.ToString()))
        {
            if (!traversedList.Contains(point.ToString()))
            {
                Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
                traversedList.Add(point.ToString());
                parent = point.ToString();
                for (int x = 1; x < array1[point].Length; x++)
                {
                    Console.WriteLine("times");
                    if (buffer1.TryReceive(out value))
                    {
                        array1 = (string[][])value;
                    }
                    if (buffer1.TryReceive(out value))
                    {
                        id = (int)buffer1.Receive();
                    }
                    id++;
                    buffer1.Post(parent);
                    buffer1.Post(array1);
                    buffer1.Post(x);
                    buffer1.Post(id);
                    Console.WriteLine("times");
                    Monitor.PulseAll(Thread.CurrentThread);
                }

                //return;
            }
            else
            {
                buffer1.Post(parent);
                buffer1.Post(array1);
                buffer1.Post(point);
                buffer1.Post(id);
                Console.WriteLine("working 1");
                Monitor.PulseAll(Thread.CurrentThread);
            }
        }
        else
        {
            Console.WriteLine("working 2");
            Monitor.Wait(Thread.CurrentThread);
        }
        //Console.WriteLine(parent);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

}

}

ここに画像の説明を入力

4

3 に答える 3

3

コードにはさまざまな問題があります。

複数のスレッドからの traversedList のロックおよび「タッチ」の不適切な使用は、最も明白な問題です。

さらに重要なことは、コードが実際には Dataflow を使用していないことです。ConcurrentQueue やその他の同時コレクションと同様の方法で BufferBlock を使用しています。データフローの要点は、スレッドの代わりにActionBlocksを使用して処理を簡素化することです。デフォルトでは、アクション ブロックは処理に 1 つのスレッドのみを使用しますが、DataflowBlockOptionsクラスを介して必要な数のスレッドを指定できます。

ActionBlocks には独自の入力バッファと出力バッファがあるため、バッファリングのためだけに BufferBlocks を追加する必要はありません。

複数の関連する値をブロックに渡すと、エラーが発生してコードが混乱する可能性があるため、別の問題になります。すべての値を保持するデータ構造を作成しても、費用はかかりません。

このクラスを使用して処理メッセージを保持するとします。

    public class PointMessage
    {
        public string Message { get; set; }
        public string[][] Lines{get;set;}
        public int Point { get; set; }
        public int ID { get; set; }
    }

これらのメッセージを処理する ActionBlock を次のように作成できます。

static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);

そして、次のように各メッセージを処理します。

    private static void ProcessMessage(PointMessage arg)
    {
        if (...)
        {
            ...
            arg.ID++;
            _block.Post(arg);
        }
        else
        {
             ...
            _block.Post(arg);
        }
    }

関数が値を返す場合、ActionBlock の代わりにTransformBlockを使用できます。

あなたのコードが何をするのか理解できないので、DataFlow を使用してコードを書き直そうとはしません。少しきれいにすると、助けやすくなります。

于 2012-06-05T08:12:36.510 に答える
1

問題は、スレッドが待機を呼び出すためにモニターを所有する必要があることです。したがって、このようなエラーがこれ以上発生しないようにするために、Monitor.PulseAll と Monitor.Wait をロックする必要があります。

ロックについて説明する必要がある場合は、別の質問を開いてください。詳しく説明します。:)

于 2012-06-05T04:41:06.477 に答える
0

編集: 私の投稿を無視する - 代わりに @PanagiotisKanavos からの投稿を読んでください...

これはコンパイルされませんが、ロックを使用するための正しい方向に設定されます:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;

public class DFS
{
    static List<string> traversedList = new List<string>();

    static List<string> parentList = new List<string>();
    static Thread[] thread_array;
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

    public static void Main(string[] args)
    {

        int N = 100;
        int M = N * 4;
        int P = N * 16;

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        List<string> global_list = new List<string>();

        StreamReader file = new StreamReader(args[args.Length - 2]);


        string text = file.ReadToEnd();

        string[] lines = text.Split('\n');



        string[][] array1 = new string[lines.Length][];

        for (int i = 0; i < lines.Length; i++)
        {
            lines[i] = lines[i].Trim();
            string[] words = lines[i].Split(' ');

            array1[i] = new string[words.Length];

            for (int j = 0; j < words.Length; j++)
            {
                array1[i][j] = words[j];
            }
        }

        StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

        for (int i = 0; i < array1.Length; i++)
        {
            for (int j = 0; j < array1[i].Length; j++)
            {
                if (j != 0)
                {
                    sr.Write(array1[i][0] + ":" + array1[i][j]);
                    Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                    sr.Write(sr.NewLine);
                }
            }

        }
        int start_no = Convert.ToInt32(args[args.Length - 1]);
        thread_array = new Thread[lines.Length];
        string first_message = "root";
        //buffer1.Post(first_message);
        //buffer1.Post(array1);
        //buffer1.Post(start_no);
        //buffer1.Post(1);

        for (int t = 1; t < lines.Length; t++)
        {
            Console.WriteLine("thread" + t);
            thread_array[t] = new Thread(new ThreadStart(thread_run));
            thread_array[t].Name = t.ToString();
            lock (thread_array[t])
            {
                Console.WriteLine("working");
                thread_array[t].Start();
                thread_array[t].Join();
            }

        }
        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed);
        Console.ReadLine();
    }

    private static void dfs(string[][] array, int point)
    {
        for (int z = 1; z < array[point].Length; z++)
        {
            if ((!traversedList.Contains(array[point][z])))
            {
                traversedList.Add(array[point][z]);
                parentList.Add(point.ToString());
                dfs(array, int.Parse(array[point][z]));
            }

        }
        return;


    }

    bool busy;
    private readonly object syncLock = new object();

    public static void thread_run()
    {
        try
        {
            string parent;
            string[][] array1;
            int point;
            int id;
            //parent = (string)buffer1.Receive();
            //array1 = (string[][])buffer1.Receive();
            //point = (int)buffer1.Receive();
            //id = (int)buffer1.Receive();
            object value;
            Console.WriteLine("times");

            if (Thread.CurrentThread.Name.Equals("Point.ToString()"))
            {
                if (!traversedList.Contains("Point.ToString()"))
                {
                    for (int x = 1; x < 99999; x++)
                    {
                        Console.WriteLine("times");
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    array1 = (string[][])value;
                        //}
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    id = (int)buffer1.Receive();
                        //}
                        //id++;
                        //buffer1.Post(parent);
                        //buffer1.Post(array1);
                        //buffer1.Post(x);
                        //buffer1.Post(id);
                        Console.WriteLine("times");

                        lock (syncLock)
                        {
                            while (busy)
                            {
                                busy = false;
                                Monitor.PulseAll(Thread.CurrentThread);
                            }
                            busy = true; // we've got it!
                        }


                    }

                    //return;
                }
                else
                {
                    //buffer1.Post(parent);
                    //buffer1.Post(array1);
                    //buffer1.Post(point);
                    //buffer1.Post(id);
                    lock (syncLock)
                    {
                        while (busy)
                        {
                            busy = false;
                            Monitor.PulseAll(Thread.CurrentThread);
                        }
                        busy = true; // we've got it!
                    }
                }
            }
            else
            {
                Console.WriteLine("working 2");
                lock (syncLock)
                {
                    while (busy)
                    {
                        Monitor.Wait(Thread.CurrentThread);
                    }
                    busy = true; // we've got it!
                }

            }
            //Console.WriteLine(parent);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

}
于 2012-06-05T06:29:44.260 に答える