0

私はPythonでfalconフレームワークを使用して、Web APIのjson応答を形成しています。

たとえばlogic()、30〜90分間機能する関数があります。私はこのようなものが欲しい:

  1. http-client が /api/somepath.json を要求すると、呼び出します somepath_handle()
  2. somepath_handle()logic()別のスレッド/プロセスで実行される
  3. 終了したらlogic()、スレッドは閉じられます
  4. somepath_handle()logic()return からの応答を読み取ります
  5. somepath_handle()が終了する前に強制終了された場合logic()、スレッド/etclogic()は終了するまで停止されません

コード:

def somepath_handle():
    run_async_logic()
    response=wait_for_async_logic_response() # read response of logic()
    return_response(response)
4

2 に答える 2

1

プロセスに時間がかかる場合は、電子メールまたはライブ通知システムを使用して結果をユーザーに送信することをお勧めします。

于 2015-07-15T09:02:20.653 に答える
0

単純なワーカーを使用して、いくつかのコマンドを処理するキューを作成しています。単純な応答ストレージを追加すると、接続が失われたときに要求を処理し、それらを失わない可能性があります。

例: falconframework.org を使用してリクエストに応答したのは main 関数です。

main.py:

from flow import Flow
import falcon
import threading
import storage

__version__ = 0.1
__author__ = 'weldpua2008@gmail.com'


app = falcon.API(
    media_type='application/json')

app.add_route('/flow', Flow())

THREADS_COUNT = 1
# adding the workers to process queue of command
worker = storage.worker
for _ in xrange(THREADS_COUNT):
    thread = threading.Thread(target=worker)
    thread.daemon = True
    thread.start()

ワーカー コード storage.py を使用した単純なストレージです。

from Queue import Queue
import subprocess
import logging
main_queque = Queue()

def worker():
    global main_roles_queque
    while True:
        try:
            cmd = main_queque.get()
            #do_work(item)
            #time.sleep(5)
            handler = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            stdout, stderr = handler.communicate()
            logging.critical("[queue_worker]: stdout:%s, stderr:%s, cmd:%s" %(stdout, stderr, cmd))
            main_queque.task_done()
        except Exception as error:
            logging.critical("[queue_worker:error] %s" %(error))

あらゆるリクエストを処理するクラスです [POST, GET] flow.py:

import storage
import json
import falcon
import random

class Flow(object):

    def on_get(self, req, resp):
        storage_value = storage.main_queque.qsize()
        msg = {"qsize": storage_value}
        resp.body = json.dumps(msg, sort_keys=True, indent=4)
        resp.status = falcon.HTTP_200

    #curl -H "Content-Type: application/json" -d '{}'  http://10.206.102.81:8888/flow
    def on_post(self, req, resp):
        r = random.randint(1, 10000000000000)
        cmd = 'sleep 1;echo "ss %s"' % str(r)
        storage.main_queque.put(cmd)
        storage_value = cmd
        msg = {"value": storage_value}
        resp.body = json.dumps(msg, sort_keys=True, indent=4)
        resp.status = falcon.HTTP_200
于 2015-09-09T04:34:08.250 に答える