3

RabbitMQ から読み取るメソッドを呼び出すタスクを実行しています。キューに何もない場合、メソッドは単純にブロックされます。したがって、タスクのステータスは「実行中」ですが、実際には何も実行されていません。これらのタスクを正常に終了する方法はありますか?

キューにアクセスするコードは次のとおりです。

 private void FindWork(CancellationToken ct)
    {
        if (ct.IsCancellationRequested)
            return;

        bool result = false;
        bool process = false;
        bool queueResult = false;
        Work_Work work = null;

        try
        {
            using (Queue workQueue = new Queue(_workQueue))
            {
                // Look for work on the work queue
                workQueue.Open(Queue.Mode.Consume);
                work = workQueue.ConsumeWithBlocking<Work_Work>();

                // Do some work with the message ...

                return;

タスクは次のように作成されます。

private void Run()
    {
        while (!_stop)
        {
            // Remove and stopped tasks from the pool
            List<int> removeThreads = new List<int>();

            lock (_tasks)
            {
                foreach (KeyValuePair<int, Task> task in _tasks)
                {
                    if (task.Value.Status != TaskStatus.Running)
                    {
                        task.Value.Wait();
                        removeThreads.Add(task.Value.Id);
                    }
                }

                foreach (int taskID in removeThreads)
                    _tasks.Remove(taskID);
            }

            CancellationToken ct = _cts.Token;
            TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);

            // Create new tasks if we have room in the pool
            while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() => FindWork(ct));

                lock (_tasks)
                    _tasks.Add(task.Id, task);
            }

            // Take a rest so we don't run the CPU to death
            Thread.Sleep(1000);
        }
    }

現在、タスク作成コードを次のように変更して、タスクを中止できるようにしました。これが良い解決策ではないことはわかっていますが、他に何をすべきかわかりません。

while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() =>
                    {
                        try
                        {
                            using (_cts.Token.Register(Thread.CurrentThread.Abort))
                            {
                                FindWork(ct);
                            }
                        }
                        catch (ThreadAbortException)
                        {
                            return;
                        }
                    }, _cts.Token);

                _tasks.Add(task.Id, task);
            }
4

2 に答える 2

1

これを機能させるには、ConsumeWithBlockingキャンセルをサポートするように変更する必要があります。私はRabbitMQに詳しくありませんが、コンシューマー チャネルでのキャンセルをサポートしているようです。

したがって、コールバックから行うのではなくThread.CurrentThread.AbortToken.Register正しいことを行い、適切な RabbitMQ API を介して適切に操作をキャンセルしてください。

ちなみに、現在中止しようとしているスレッドは、によってブロックされているスレッドではない可能性が最も高いConsumeWithBlockingです。

于 2014-05-28T19:57:22.813 に答える