1

ある種の重いタスクの HTTP ラッパーを実装し、フロントエンド サーバー フレームワークとして Tornado を選択しました (これは、重いタスクが Python で記述されており、Tornado に慣れているためです)。

現在、Tornado のプロセスから重いタスクを直接呼び出すだけです。jQuery を使って何らかの Web ベースのインターフェースを用意し、フォームにパラメーターを設定して AJAX リクエストを処理させます。

ご想像のとおり、Web ブラウザーからスローしたタスクはキャンセルできません。私がキャンセルできる唯一の方法は、9 または 15 のシグナルを Python プロセスに送信することです。これは、ユーザーが通常実行できることではありません。

HTTP経由である種の「キャンセル」リクエストをリクエストすることで、現在作業中のタスクをキャンセルできるようにしたいと考えています。どうすればそれができますか?負荷の高いタスク (YouTube でのビデオ エンコーディングなど) を処理するほとんどの Web サービスは何を行っていますか?

4

1 に答える 1

1

実際、TornadoFuturesはキャンセルをサポートしていません ( docs )。さらに、 を使用してもwith_timeout、タイムアウトしたジョブはまだ実行されており、その結果を待つものは何もありません。

タイムアウトでトルネードでハングしている非同期タスクをキャンセルするにはどうすればよいですか? にも記載されている唯一の方法は? 、キャンセルできるようにロジックを実装することです(フラグなどを使用して)。

例:

  • 仕事は単純な非同期スリープです
  • /リストジョブ
  • /add/TIME新しいジョブを追加します - TIME 秒 - スリープする時間を指定します
  • /cancel/IDジョブをキャンセル

コードは次のようになります。

from tornado.ioloop import IOLoop
from tornado import gen, web
from time import time

class Job():

    def __init__(self, run_sec):
        self.run_sec = int(run_sec)
        self.start_time = None
        self.end_time = None
        self._cancelled = False

    @gen.coroutine
    def run(self):
        """ Some job

        The job is simple: sleep for a given number of seconds.
        It could be implemented as:
             yield gen.sleep(self.run_sec)
        but this way makes it not cancellable, so
        it is divided: run 1s sleep, run_sec times 
        """
        self.start_time = time()
        deadline = self.start_time + self.run_sec
        while not self._cancelled:
            yield gen.sleep(1)
            if time() >= deadline:
                break
        self.end_time = time()

    def cancel(self):
    """ Cancels job

    Returns None on success,
    raises Exception on error:
      if job is already cancelled or done
    """
        if self._cancelled:
            raise Exception('Job is already cancelled')
        if self.end_time is not None:
            raise Exception('Job is already done')
        self._cancelled = True

    def get_state(self):
        if self._cancelled:
            if self.end_time is None:
                # job might be running still
                # and will be stopped on the next while check
                return 'CANCELING...'
            else:
                return 'CANCELLED'
        elif self.end_time is None:
            return 'RUNNING...'
        elif self.start_time is None:
            # actually this never will shown
            # as after creation, job is immediately started
            return 'NOT STARTED'
        else:
            return 'DONE'


class MainHandler(web.RequestHandler):

    def get(self, op=None, param=None):
        if op == 'add':
            # add new job
            new_job = Job(run_sec=param)
            self.application.jobs.append(new_job)
            new_job.run()
            self.write('Job added')
        elif op == 'cancel':
            # cancel job - stop running
            self.application.jobs[int(param)].cancel()
            self.write('Job cancelled')
        else:
            # list jobs
            self.write('<pre>') # this is so ugly... ;P
            self.write('ID\tRUNSEC\tSTART_TIME\tSTATE\tEND_TIME\n')
            for idx, job in enumerate(self.application.jobs):
                self.write('%s\t%s\t%s\t%s\t%s\n' % (
                    idx, job.run_sec, job.start_time,
                    job.get_state(), job.end_time
                ))


class MyApplication(web.Application):

    def __init__(self):

        # to store tasks
        self.jobs = []

        super(MyApplication, self).__init__([
            (r"/", MainHandler),
            (r"/(add)/(\d*)", MainHandler),
            (r"/(cancel)/(\d*)", MainHandler),
        ])

if __name__ == "__main__":
    MyApplication().listen(8888)
    IOLoop.current().start()

カップル ジョブを追加します。

for a in `seq 12 120`; do curl http://127.0.0.1:8888/add/$a; done

次に、いくつかをキャンセルします... 注 - Tornado のみが必要です。

この例は非常に単純で、負荷の高いタスクgen.sleepを想定しています。もちろん、すべてのジョブがキャンセル可能な方法で実装するほど単純なわけではありません。

于 2015-12-29T21:39:05.247 に答える