2

Celeryを使用して一部のデータをクロールするWebスパイダーを実行し、その後、このデータをデータベースのどこかに保存する必要があります(たとえば、SQLite)が、Celeryワーカー間でSQLAlchemyセッションを共有できないことを理解しています。この問題をどのように解決しますか?どちらの方法が一般的ですか?

現在、Redisをデータの中間ストレージとして使用しようとしています。

@celery.task
def run_spider(spider, task):
    # setup worker
    logger = logging.getLogger('Spider: %s' % spider.url)
    spider.meta.update({'logger': logger, 'task_id': int(task.id)})

    # push task data inside worker
    spider.meta.update({'task_request': run_spider.request})

    spider.run()

    task.state = "inactive"
    task.resolved = datetime.datetime.now()
    db.session.add(task)
    db.session.commit()

編集:実際、私は間違っていました。セッションを共有する必要はありません。セロリのプロセス/タスクごとに新しいデータベース接続を作成する必要があります。

4

2 に答える 2

4

私も、大規模なセロリアプリケーションで永続化するためにredisを使用しました。

私のタスクは次のようになるのが一般的です。

@task
def MyTask(sink, *args, **kwargs):
    data_store = sharded_redis.ShardedRedis(sink)
    key_helper = helpers.KeyHelper()
    my_dictionary = do_work()
    data_store.hmset(key_helper.key_for_my_hash(), my_dictionary)
  • sharded_redisこれは、クライアントを介してシャーディングキーを処理するいくつかのredisシャードを抽象化したものです。
  • sink(host, port)シャードが決定された後に適切な接続を確立するために使用されるタプルのリストです。

基本的に、接続プールを作成するのではなく、各タスクでredisに接続および切断します(非常に安価です)。

接続プールを使用することは機能しますが、実際にセロリを利用する(多くの同時タスクを実行する)場合は、特に接続プールを使い果たすリスクがあるため、この方法を使用する方が良いでしょう(私の意見では) redisで少し時間がかかることをしている場合(大きなデータセットをメモリに読み込むなど)。

redisへの接続はかなり安いので、これはうまく拡張できるはずです。いくつかのインスタンスで、1分あたり数十万のタスクを処理していました。

于 2012-08-09T04:41:11.450 に答える
0

実際、私は間違っていました。セッションを共有する必要はありません。セロリのプロセス/タスクごとに新しいデータベース接続を作成する必要があります。

于 2012-11-17T04:43:44.810 に答える