2

AIプロジェクト用の巨大なn-gramモデルを構築するシステムを開発しています。
私のパイプラインは次のとおりです。
リソース入力->データのフェッチ->パーサー->トレーナー
リソース入力(基本的には解析する必要のあるURL)は一定ではありません。つまり、一度に何千ものリソースを導入できます。 、後で数十の別のバルクなど。

私のアイデアは、パイプラインの各ステップをCeleryタスクとして実装し、それをクラウドにデプロイすることです(たとえば、Herokuのワーカーdynoを使用します)。しかし、私はCeleryを初めて使用し、これらのタスクをキューに入れて、ワーカーを100%で動作させ、同時にシステムの整合性を維持する方法について疑問を持っています。
簡単なアプローチは、前のタスクが終了するとすぐにタスクのキューイングを開始することです。たとえば、1000アイテムのリソース入力を取得した場合、1000個の「データのフェッチ」タスクをスケジュールし、各タスクは「終了時に「解析」タスクなど。しかし、これらのタスクが完了する前により多くのリソースが入ってくるため、これは巨大なキューにつながります。モデルの構築には数か月かかることを私は知っています(モデルが完了する場合)。

したがって、Celeryがメモリの問題(Herokuには限界があります)や、現在想像できないその他の問題に陥ることなく、これらすべてを処理できるかどうかはわかりません。または、X分ごとにタスクのチャンクをスケジュールしたり、データベースに部分的な結果を保存したりするなど、より複雑な手法を使用する必要があります。これにより、これらの問題の一部を回避できますが、ワーカーの時間は100%になりません。

何かご意見は?
ありがとう!


編集

私の質問への答えは、実際には受け入れられた答えのコメントにあります

4

1 に答える 1

2

タスクごとに個別のキューを用意し、キューごとに専用のワーカーを実行することで、システムが各タスクに等しく注意を払いながら100%のシステムリソースを使用するようにすることができます。さらに、ワーカーを追加して、タスクの実行時間に基づいてタスク処理のバランスをとることができます。

たとえば、タスクを定義する

@celery.task
def fetch(url):
    # fetch url
    return html

@celery.task
def parse(html):
    pass

自動ルーティングの構成:

CELERY_ROUTES = {'tasks.fetch':{'queue':'fetch_queue'}、'tasks.parse':{'queue':'parse_queue'}}

そして実行中の労働者:

$ celery worker -Q fetch_queue

$ celery worker -Q parse_queue

タスクタイプごとに個別のワーカーがあります。

コールバックを使用すると、フェッチ後に簡単に解析できます。

fetch.apply_async((url), link=parse.subtask())

PSワーカーをフェッチするために、 Eventletプールを使用して非同期IOを利用できます。

于 2012-07-28T09:12:04.950 に答える