非同期メソッド呼び出しを使用してメソッド パイプラインを開発しようとしていました。パイプラインのロジックは次のとおりです。
- パイプライン内の m 個のメソッドにフィードする必要があるコレクション内の n 個のデータがあります
- T のコレクションを列挙する
- 最初の要素を最初のメソッドにフィードする
- 出力を取得し、それを 2 番目のメソッドに非同期的にフィードします
- 同時に、コレクションの 2 番目の要素を最初のメソッドにフィードします。
- 最初のメソッドの完了後、結果を 2 番目のメソッドに渡します (2 番目のメソッドがまだ実行中の場合は、結果をそのキューに入れ、最初のメソッドで 3 番目の要素の実行を開始します)。
- 2 番目のメソッドの実行が終了したら、キューから最初の要素を取得して実行します (すべてのメソッドは非同期で実行する必要があり、次のメソッドが終了するのを待つべきではありません)。
- m番目のメソッドで、データを実行した後、結果をリストに格納します
- m 番目のメソッドで n 番目の要素が完了したら、結果のリスト (n 個の結果) を最初のレベルに返します。
次のようなコードを思いつきましたが、意図したとおりに動作せず、結果が返されず、さらに本来の順序で実行されていません。
static class Program
{
static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3, 4 };
var result = list.ForEachPipeline(Add, Square, Add, Square);
foreach (var element in result)
{
Console.WriteLine(element);
Console.WriteLine("---------------------");
}
Console.ReadLine();
}
private static int Add(int j)
{
return j + 1;
}
private static int Square(int j)
{
return j * j;
}
internal static void AddNotify<T>(this List<T> list, T item)
{
Console.WriteLine("Adding {0} to the list", item);
list.Add(item);
}
}
internal class Function<T>
{
private readonly Func<T, T> _func;
private readonly List<T> _result = new List<T>();
private readonly Queue<T> DataQueue = new Queue<T>();
private bool _isBusy;
static readonly object Sync = new object();
readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);
internal Function(Func<T, T> func)
{
_func = func;
}
internal Function<T> Next { get; set; }
internal Function<T> Start { get; set; }
internal int Count;
internal IEnumerable<T> Execute(IEnumerable<T> source)
{
var isSingle = true;
foreach (var element in source) {
var result = _func(element);
if (Next != null)
{
Next.ExecuteAsync(result, _waitHandle);
isSingle = false;
}
else
_result.AddNotify(result);
}
if (!isSingle)
_waitHandle.WaitOne();
return _result;
}
internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
{
lock(Sync)
{
if(_isBusy)
{
DataQueue.Enqueue(element);
return;
}
_isBusy = true;
_func.BeginInvoke(element, CallBack, resetEvent);
}
}
internal void CallBack(IAsyncResult result)
{
bool set = false;
var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
var resultElement = worker.EndInvoke(result);
var resetEvent = result.AsyncState as ManualResetEvent;
lock(Sync)
{
_isBusy = false;
if(Next != null)
Next.ExecuteAsync(resultElement, resetEvent);
else
Start._result.AddNotify(resultElement);
if(DataQueue.Count > 1)
{
var element = DataQueue.Dequeue();
ExecuteAsync(element, resetEvent);
}
if(Start._result.Count == Count)
set = true;
}
if(set)
resetEvent.Set();
}
}
public static class Pipe
{
public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
{
Function<T> start = null, previous = null;
foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
{
if (start == null)
{
start = previous = function;
start.Start = function;
continue;
}
function.Start = start;
previous.Next = function;
previous = function;
}
return start != null ? start.Execute(source) : null;
}
}
このことを機能させるのを手伝ってくれませんか?この設計が実際のメソッド パイプラインに適していない場合は、別のものを提案してください。
編集:.Net 3.5に厳密に固執する必要があります。