0

cpu バウンド タスクのコードからプロセス コンシューマーを開始するにはどうすればよいですか?

また、即時なしでシグナルコールバックを取得するにはどうすればよいですか? immediate=True で MemoryHuey を実行するとすべて正常に動作しますが、False に設定すると空のリストしか得られません。

問題:

異なる優先度で処理する必要があるエンドポイントがいくつかあります。プロセスはすべて CPU を集中的に使用するため、バックグラウンドに移動して、マルチプロセッシングを使用して処理する必要があります。Redis クラスターは、後でジョブ ストレージとして使用されます。

ありがとう:) PS:非同期を無視

編集: または、AWS EB のスーパーバイザーで実行する簡単なソリューションはありますか?

huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"

app = FastAPI()

router = APIRouter()
app.include_router(router)

# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)

# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)

try:
    huey = PriorityRedisHuey(
           'worker',
           results=True, # set to True if testing
           store_none=False,
        #   host=os.getenv('REDIS_HOST'),
        #   port=6379,
        # only for docker-compose testing
           host='redis',
           port=6379
       )
    huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
    print("exception during connection catched", e.status_code, e.detail)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/")
async def root():
    """Entrypoint
    """
    return {"message": "OCR Engine and NER service"}


# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup

jobs = list()

@app.get("/job_monitoring")
async def monitor_jobs():
    if len(jobs) > 200:
        del jobs[:]
    return jobs

@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
    now = datetime.now()
    dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
    jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))


@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
              max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
    """Named Entity Recognition endpoint
    """
    task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
    return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
                       callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
                       return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
    """ner_complete endpoint except txt files
    """
    task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
    return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
                  res_width: int = 3500, check_before_ocr: bool = True):
    """txt endpoint
    """
    task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}


@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
                   res_width: int = 3500, check_before_ocr: bool = True):
    """pdfa endpoint
    """
    task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
    return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}


@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
    """one job execution and returns result to URL

    Parameters
    ----------
    invoice : class
        API input
    mode : str
        return as txt or pdfa
    token : str
        validation token
    callback_url : str
        url for result callback
    res_width : int
        width for image resizing
    check_before_ocr : bool
        checks if image is usable for OCR
    """
    headers = {'Authorization': 'Bearer ' + token}
    try:
        start = timer()
        result = handle_ocr(invoice, mode, res_width, check_before_ocr)
        time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
        requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
    except HTTPException as e:
        print("exception catched", e.status_code, e.detail)
        requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})


@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
    """one job execution and returns result to URL

    Parameters
    ----------
    invoice : class
        API input
    token : str
        validation token
    callback_url : str
        url for result callback
    max_sentence_number : int
        max number of sentences to process
    return_merged : bool
        if true merge result tokens
    crop_image : int
        percentage to crop from image
    ocr : bool
        if true do ocr
    """
    headers = {'Authorization': 'Bearer ' + token}
    try:
        start = timer()
        result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
        time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
        requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
    except HTTPException as e:
        print("exception catched", e.status_code, e.detail)
        requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})

編集:私はデプロイにElasticBeanstalkを使用しています/スーパーバイザーで実行しますか?

4

1 に答える 1