5

を使用して、タスクパイプライン/順序付きスケジューラを作成しようとしていますTaskFactory.FromAsync

(I / O完了ポートを使用して)Webサービス要求を実行できるようにしたいのですがFromAsync、それらの順序を維持し、一度に1つの要求のみを実行します。

現時点では使用していないFromAsyncので、1つのリクエストのみが未処理であることを確認するためにTaskFactory.StartNew(()=>api.DoSyncWebServiceCall())OrderedTaskSchedulerによって使用されているものを使用できます。TaskFactory

このメソッドを使用すると、この動作は維持されると思いましたFromAsyncが、そうではありません。

TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler());
var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));
var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a));

これらのbeginGetStuffメソッドはすべて、FromAsync呼び出し内で呼び出されます(したがって、順番にディスパッチされますがn、同時に発生するapi呼び出しがあります)。

FromAsyncTaskSchedulerを必要とする過負荷があります。

public Task FromAsync(
    IAsyncResult asyncResult,
    Action<IAsyncResult> endMethod,
    TaskCreationOptions creationOptions,
    TaskScheduler scheduler
)

しかし、ドキュメントは言う:

endメソッドを実行するタスクをスケジュールするために使用されるTaskScheduler。

そしてあなたが見ることができるように、それはすでに構築されたものを取ります、ではありIAsyncResultませんFunc<IAsyncResult>

これはカスタムFromAsyncメソッドを必要としますか、それとも何かが足りませんか?誰かがこの実装をどこから始めるべきか提案できますか?

乾杯、

編集:

この動作を呼び出し元から抽象化したいので、TaskFactory(特殊なTaskScheduler)の動作に従って、タスクをすぐに返す必要があります-このタスクは、FromAsyncタスクをカプセル化するだけでなく、待機中にそのタスクのキューイングも行います実行する順番です。

考えられる解決策の1つ:

class TaskExecutionQueue
{
    private readonly OrderedTaskScheduler _orderedTaskScheduler;
    private readonly TaskFactory _taskFactory;
    public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler)
    {
        _orderedTaskScheduler = orderedTaskScheduler;
        _taskFactory = new TaskFactory(orderedTaskScheduler);

    }

    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return _taskFactory.StartNew(taskGenerator).Unwrap();
    }
}

ただし、これはFromAsync呼び出しが発生している間、スレッドを利用します。理想的には、そうする必要はありません。

4

3 に答える 3

2

IOタスクにはスレッドが関連付けられていないため、IOタスクをスケジュールすることはできません。WindowsカーネルはスレッドレスIO操作を提供します。これらのIOの開始にはマネージコードは含まれず、TaskSchedulerクラスは機能しません。

したがって、実際にネットワークにアクセスする必要があることが確実になるまで、IOの開始を遅らせる必要があります。SemaphoreSlim.WaitAsync現在実行中のタスクの量を調整するために使用できます。個々のIOを開始する前に、そのメソッドの結果を待ち、それも待ちます。

于 2012-12-06T17:22:57.383 に答える
2

これを行う最も簡単な方法は、TPLデータフローを使用することです。

非同期デリゲートのストリームを受信し、一度に1つずつ実行する「ブロック」を定義できます(各デリゲートが完了するまで待ってから、次のデリゲートを開始します)。

var block = new ActionBlock<Func<Task>>(func => func());

次に、Webサービスリクエストを実行するには:

block.Post(() => Task.Factory.FromAsync(...));

または(私が好む):

block.Post(() => client.GetStuffAsync(a, b, c));

ActionBlockタスクを実行したいだけの場合は、このアプローチで問題ありません。出力のストリームを生成したい場合は、以下を見てくださいTransformBlock

var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func());

Receive同じ方法でリクエストを実行し、またはを呼び出すことで結果を取得できますReceiveAsync

于 2012-12-06T18:11:02.867 に答える
0

私はここでカスタムソリューションを決定しました...ロックは厄介で望ましくありませんが、現時点では、これは私が望む仕事をします。

public interface ITaskExecutionQueue
{
    Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator);
    Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator);
    int OutstandingTaskCount { get; }
    event EventHandler OutstandingTaskCountChanged;
}

/// This class ensures that only a single Task is executed at any one time.  They are executed sequentially in order being queued.
/// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) 
/// which are not able to be scheduled using a traditional TaskScheduler.
/// Ensure that the `outer` tasks you queue are unstarted.  E.g. <![CDATA[
/// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask()));
/// ]]>
class OrderedTaskExecutionQueue : ITaskExecutionQueue
{
    private readonly Queue<Task> _queuedTasks = new Queue<Task>();
    private Task _currentTask;
    private readonly object _lockSync = new object();

    /// <summary>
    /// Queues a task for execution
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param>
    /// <returns></returns>
    public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator)
    {
        return QueueTask(new Task<Task<TResult>>(taskGenerator));
    }

    public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator)
    {
        Task<TResult> unwrapped = taskGenerator.Unwrap();
        unwrapped.ContinueWith(_ =>
                               {
                                   EndTask();
                                   StartNextTaskIfQueued();
                               }, TaskContinuationOptions.ExecuteSynchronously);

        lock (_lockSync)
        {
            _queuedTasks.Enqueue(taskGenerator);

            if (_currentTask == null)
            {
                StartNextTaskIfQueued();
            }
        }

        TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
        tcs.TrySetFromTaskIncomplete(unwrapped);

        OutstandingTaskCountChanged.Raise(this);

        return tcs.Task;
    }

    private void EndTask()
    {
        lock (_lockSync)
        {
            _currentTask = null;
            _queuedTasks.Dequeue();
        }

        OutstandingTaskCountChanged.Raise(this);
    }

    private void StartNextTaskIfQueued()
    {
        lock (_lockSync)
        {
            if (_queuedTasks.Count > 0)
            {
                _currentTask = _queuedTasks.Peek();

                _currentTask.RunSynchronously();
            }
        }
    }

    /// <summary>
    /// Includes the currently executing task.
    /// </summary>
    public int OutstandingTaskCount
    {
        get
        {
            lock (_lockSync)
            {
                return _queuedTasks.Count;
            }
        }
    }

    public event EventHandler OutstandingTaskCountChanged;
}

未開始を取り込みTask<Task<TResult>>ます-これにより、キューはいつ実行してFromAsync呼び出しを開始するかを決定できます(これは内部タスクです)。使用法:

Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction));
Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask);
于 2012-12-07T17:05:47.707 に答える