1

いくつかの操作を定期的に実行する必要があります。主な要件は次のとおりです
。1) 前の更新サイクルが終了していない間に次の更新サイクルを開始しないこと
2) 前の反復で取得したデータがまだ有効な場合、更新を開始しないこと、つまり、最後の更新からの時間が TTL 値よりも小さいこと
3) 個別のサイクルがあることそのような更新を行うために必要な (>10 など) スレッド。
SOには同じ種類の質問がたくさんあるので、 @Jim による PeriodicTaskFactory のこの実装を見つけまし
期待どおりに動作していますが、複数のそのようなファクトリを同時に生成する場合、更新中にオーバーヘッドが発生し始め、プロセス全体が変形します (発生しようとしていたいくつかの反復がキャンセルされます)。コードは次のとおりです。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace CA_TasksTests
{
    class Program
    {
        // Result
        public class Result
        {
            public string Message { get; set; }
            public Result(int iter, string info)
            {
                Message = "#" + iter + info;
            }
            public override string ToString()
            {
                return Message;
            }
        }

        // Operation parameters
        public class Operation
        {
            public string OperationName { get; set; }
            public TimeSpan TTL { get { return TimeSpan.FromMilliseconds(Interval); } }
            public DateTime LastUpdate { get; set; }
            public Operation(int id)
            {
                OperationName = "Operation" + ((id < 10) ? "0" : "") + id;
            }

        }

        public static int Interval = 2000;
        public static int Duration = 10000;
        public static int OperationsCount = 10;
        static void Main()
        {
            // Creating 10 test operations
            var operations = Enumerable.Range(1, OperationsCount).Select(i => new Operation(i)).ToList();
            // Executing them
            var r = ExecuteActions(operations).OrderBy(i => i.Message).ToList();
            Console.WriteLine("Results (expected=" + (Duration/Interval*OperationsCount) + ") : " + r.Count);
            Console.ReadLine();

        }

        // Operation execution
        public static Result ExecuteOperation(int iter, Operation operation)
        {
            // Assiging last update timestamp
            operation.LastUpdate = DateTime.Now;
            var t = Task.Factory.StartNew(() =>
                {
                    // Some operation
                    Thread.Sleep(1000);
                    return new Result(iter, operation.OperationName);
                });
            var r = t.Result;
            return r;
        }

        public static List<Result> ExecuteActions(List<Operation> operations)
        {
            var list = new List<Result>();
            var tasks = new ConcurrentBag<Task>();
            foreach (var currentOperation in operations)
            {
                var iter = 0;
                var locker = new object();
                Operation operation = currentOperation;
                var perdiodicTask = PeriodicTaskFactory.Start(() =>
                                {
                                    // (*) Looking if we need updates semantically - 
                                    // through comparing time since last refresh with operation TTL
                                    Console.WriteLine(DateTime.Now + " : " + (DateTime.Now - operation.LastUpdate) + " ?> " + operation.TTL);
                                    // Looking if we need updates logically -
                                    // if previous operation is still running
                                    if (!Monitor.TryEnter(locker))
                                    {
                                        Console.WriteLine(">>>" + DateTime.Now + " Cancelled");
                                        return;
                                    }
                                    try
                                    {
                                        // Semantic update
                                        if (DateTime.Now - operation.LastUpdate > operation.TTL)
                                        {
                                            iter++;
                                            Console.WriteLine(DateTime.Now + " Refresh #" + iter + " " + operation.OperationName);
                                            list.Add(ExecuteOperation(iter, operation));
                                        }
                                    }
                                    finally
                                    {
                                        Monitor.Exit(locker);
                                    }
                                }, intervalInMilliseconds: (int)operation.TTL.TotalMilliseconds, duration: Duration /*maxIterations:2*/);


                var end = perdiodicTask.ContinueWith(_ =>
                    {
                        _.Dispose();
                        Console.WriteLine(">>>" + DateTime.Now + " " + operation.OperationName + " finished");
                    });
                tasks.Add(end);
            }
            Task.WaitAll(tasks.ToArray());
            return list;
        }
    }

    /// <summary>
    /// Factory class to create a periodic Task to simulate a <see cref="System.Threading.Timer"/> using <see cref="Task">Tasks.</see>
    /// </summary>
    public static class PeriodicTaskFactory
    {
        /// <summary>
        /// Starts the periodic task.
        /// </summary>
        /// <param name="action">The action.</param>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds, i.e. how long it waits to kick off the timer.</param>
        /// <param name="duration">The duration.
        /// <example>If the duration is set to 10 seconds, the maximum time this task is allowed to run is 10 seconds.</example></param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create the task for executing the <see cref="Action"/>.</param>
        /// <returns>A <see cref="Task"/></returns>
        /// <remarks>
        /// Exceptions that occur in the <paramref name="action"/> need to be handled in the action itself. These exceptions will not be 
        /// bubbled up to the periodic task.
        /// </remarks>
        public static Task Start(Action action,
                                 int intervalInMilliseconds = Timeout.Infinite,
                                 int delayInMilliseconds = 0,
                                 int duration = Timeout.Infinite,
                                 int maxIterations = -1,
                                 bool synchronous = false,
                                 CancellationToken cancelToken = new CancellationToken(),
                                 TaskCreationOptions periodicTaskCreationOptions = TaskCreationOptions.None)
        {
            //Console.WriteLine(DateTime.Now + " PeriodicTaskFactory.Start");
            Stopwatch stopWatch = new Stopwatch();
            Action wrapperAction = () =>
            {
                CheckIfCancelled(cancelToken);
                action();
            };

            Action mainAction = () =>
            {
                MainPeriodicTaskAction(intervalInMilliseconds, delayInMilliseconds, duration, maxIterations, cancelToken, stopWatch, synchronous, wrapperAction, periodicTaskCreationOptions);
            };

            return Task.Factory.StartNew(mainAction, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }

        /// <summary>
        /// Mains the periodic task action.
        /// </summary>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds.</param>
        /// <param name="duration">The duration.</param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="stopWatch">The stop watch.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="wrapperAction">The wrapper action.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create a sub task for executing the <see cref="Action"/>.</param>
        private static void MainPeriodicTaskAction(int intervalInMilliseconds,
                                                   int delayInMilliseconds,
                                                   int duration,
                                                   int maxIterations,
                                                   CancellationToken cancelToken,
                                                   Stopwatch stopWatch,
                                                   bool synchronous,
                                                   Action wrapperAction,
                                                   TaskCreationOptions periodicTaskCreationOptions)
        {
            var iters = duration / intervalInMilliseconds;
            if (iters > 0)
            {
                maxIterations = iters;
            }
            TaskCreationOptions subTaskCreationOptions = TaskCreationOptions.AttachedToParent | periodicTaskCreationOptions;

            CheckIfCancelled(cancelToken);

            if (delayInMilliseconds > 0)
            {
                Thread.Sleep(delayInMilliseconds);
            }

            if (maxIterations == 0) { return; }

            int iteration = 0;

            ////////////////////////////////////////////////////////////////////////////
            // using a ManualResetEventSlim as it is more efficient in small intervals.
            // In the case where longer intervals are used, it will automatically use 
            // a standard WaitHandle....
            // see http://msdn.microsoft.com/en-us/library/vstudio/5hbefs30(v=vs.100).aspx
            using (ManualResetEventSlim periodResetEvent = new ManualResetEventSlim(false))
            {
                ////////////////////////////////////////////////////////////
                // Main periodic logic. Basically loop through this block
                // executing the action
                while (true)
                {
                    CheckIfCancelled(cancelToken);

                    Task subTask = Task.Factory.StartNew(wrapperAction, cancelToken, subTaskCreationOptions, TaskScheduler.Current);

                    if (synchronous)
                    {
                        stopWatch.Start();
                        try
                        {
                            subTask.Wait(cancelToken);
                        }
                        catch { /* do not let an errant subtask to kill the periodic task...*/ }
                        stopWatch.Stop();
                    }

                    // use the same Timeout setting as the System.Threading.Timer, infinite timeout will execute only one iteration.
                    if (intervalInMilliseconds == Timeout.Infinite) { break; }

                    iteration++;

                    if (maxIterations > 0 && iteration >= maxIterations) { break; }

                    try
                    {
                        stopWatch.Start();
                        periodResetEvent.Wait(intervalInMilliseconds, cancelToken);
                        stopWatch.Stop();
                    }
                    finally
                    {
                        periodResetEvent.Reset();
                    }

                    CheckIfCancelled(cancelToken);

                    if (duration > 0 && stopWatch.ElapsedMilliseconds >= duration) { break; }
                }
            }
        }

        /// <summary>
        /// Checks if cancelled.
        /// </summary>
        /// <param name="cancelToken">The cancel token.</param>
        private static void CheckIfCancelled(CancellationToken cancellationToken)
        {
            if (cancellationToken == null)
                throw new ArgumentNullException("cancellationToken");

            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}

TTL 比較チェックの出力 (*) は次を示します。

9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.0020000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:00.9940000 >? 00:00:02

したがって、このオーバーヘッドのためにキャンセルされた更新はほとんどありません。何が原因で、どのように修正できますか? 私の最初の推測は、その比較でいくつかのイプシロンを設定し、それで生活することにつながるスレッド切り替えの費用です。手伝ってくれてありがとう。

4

1 に答える 1