cpu バウンド タスクのコードからプロセス コンシューマーを開始するにはどうすればよいですか?
また、即時なしでシグナルコールバックを取得するにはどうすればよいですか? immediate=True で MemoryHuey を実行するとすべて正常に動作しますが、False に設定すると空のリストしか得られません。
問題:
異なる優先度で処理する必要があるエンドポイントがいくつかあります。プロセスはすべて CPU を集中的に使用するため、バックグラウンドに移動して、マルチプロセッシングを使用して処理する必要があります。Redis クラスターは、後でジョブ ストレージとして使用されます。
ありがとう:) PS:非同期を無視
編集: または、AWS EB のスーパーバイザーで実行する簡単なソリューションはありますか?
huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"
app = FastAPI()
router = APIRouter()
app.include_router(router)
# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)
# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)
try:
huey = PriorityRedisHuey(
'worker',
results=True, # set to True if testing
store_none=False,
# host=os.getenv('REDIS_HOST'),
# port=6379,
# only for docker-compose testing
host='redis',
port=6379
)
huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
print("exception during connection catched", e.status_code, e.detail)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Entrypoint
"""
return {"message": "OCR Engine and NER service"}
# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup
jobs = list()
@app.get("/job_monitoring")
async def monitor_jobs():
if len(jobs) > 200:
del jobs[:]
return jobs
@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
now = datetime.now()
dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))
@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
"""Named Entity Recognition endpoint
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
"""ner_complete endpoint except txt files
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""txt endpoint
"""
task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""pdfa endpoint
"""
task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
mode : str
return as txt or pdfa
token : str
validation token
callback_url : str
url for result callback
res_width : int
width for image resizing
check_before_ocr : bool
checks if image is usable for OCR
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = handle_ocr(invoice, mode, res_width, check_before_ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
token : str
validation token
callback_url : str
url for result callback
max_sentence_number : int
max number of sentences to process
return_merged : bool
if true merge result tokens
crop_image : int
percentage to crop from image
ocr : bool
if true do ocr
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
編集:私はデプロイにElasticBeanstalkを使用しています/スーパーバイザーで実行しますか?