4

非同期メソッド呼び出しを使用してメソッド パイプラインを開発しようとしていました。パイプラインのロジックは次のとおりです。

  1. パイプライン内の m 個のメソッドにフィードする必要があるコレクション内の n 個のデータがあります
  2. T のコレクションを列挙する
  3. 最初の要素を最初のメソッドにフィードする
  4. 出力を取得し、それを 2 番目のメソッドに非同期的にフィードします
  5. 同時に、コレクションの 2 番目の要素を最初のメソッドにフィードします。
  6. 最初のメソッドの完了後、結果を 2 番目のメソッドに渡します (2 番目のメソッドがまだ実行中の場合は、結果をそのキューに入れ、最初のメソッドで 3 番目の要素の実行を開始します)。
  7. 2 番目のメソッドの実行が終了したら、キューから最初の要素を取得して実行します (すべてのメソッドは非同期で実行する必要があり、次のメソッドが終了するのを待つべきではありません)。
  8. m番目のメソッドで、データを実行した後、結果をリストに格納します
  9. m 番目のメソッドで n 番目の要素が完了したら、結果のリスト (n 個の結果) を最初のレベルに返します。

次のようなコードを思いつきましたが、意図したとおりに動作せず、結果が返されず、さらに本来の順序で実行されていません。

static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

このことを機能させるのを手伝ってくれませんか?この設計が実際のメソッド パイプラインに適していない場合は、別のものを提案してください。

編集:.Net 3.5に厳密に固執する必要があります。

4

3 に答える 3

1

パイプラインアプローチを採用する特別な理由はありますか?IMOは、すべての関数が次々にチェーンされた状態で入力ごとに個別のスレッドを起動すると、記述が簡単になり、実行が速くなります。例えば、

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

さて、あなたのコードに来て、Mステージで正確なパイプラインを実装するには、各ステージが並行して実行できるため、正確にMスレッドが必要だと思います-今、i / pが到達していないため、一部のスレッドがアイドル状態になっている可能性があります。あなたのコードがスレッドを起動しているかどうか、そして特定の時間にスレッドの数がいくつになるかはわかりません。

于 2010-11-15T10:20:31.177 に答える
1

コードに問題はすぐには見つかりませんでしたが、少し複雑になっている可能性があります。これはあなたがやりたいことをするためのより簡単な方法かもしれません。

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

これは、あなた自身の試みのように、パイプラインの各ステージにロックを作成しません。役に立たなかったので省略しました。ただし、次のような関数をラップすることで簡単に追加できます。

var wrappedFunctions = functions.Select(x => AddStageLock(x));

これはどこにAddStageLockありますか:

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

編集:個々のExecute要素ごとに実行する作業が、待機ハンドルの作成とスレッドプールでのタスクのスケジューリングのオーバーヘッドを小さくしない限り、実装はおそらくシングルスレッドの実行よりも遅くなります。マルチスレッドのメリットを実際に享受するには、次のことを行う必要があります。オーバーヘッドを制限します。.NET 4のPLINQは、データをパーティション化することによってこれを行います。

于 2010-11-15T11:06:41.817 に答える
0

反復ごとにスレッドを分割し、結果をロックリソースに集約しないでください。あなただけがする必要があります。これには PLinq を使用できます。リソースのメソッドを間違えている可能性があると思います。メソッドが共有リソースを含むクリティカル ブロックを処理している場合にのみ、メソッドをロックする必要があります。リソースを取り出してそこから新しいスレッドに割り込むことで、2 番目のメソッドを管理する必要がなくなります。

IE: メソッド X は Method1 を呼び出してから、値を Method2 に渡します。

于 2010-11-15T10:22:53.197 に答える