6

フラスコセロリの基本的な例を実行した後(私が知る限り正常に実行されます)、これを自分のプロジェクトに統合しようとしています。基本的に、私はこれを以下で使用しています:

from flask import Blueprint, jsonify, request, session
from flask.views import MethodView
from celery.decorators import task

blueprint = Blueprint('myapi', __name__)

class MyAPI(MethodView):

    def get(self, tag):
        return get_resource.apply_async(tag)

@task(name="get_task")
def get_resource(tag):
    pass

例と同じ設定で、次のエラーが発生します。

Traceback (most recent call last):
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1518, in __call__
    return self.wsgi_app(environ, start_response)
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1506, in wsgi_app
    response = self.make_response(self.handle_exception(e))
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1504, in wsgi_app
    response = self.full_dispatch_request()
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1264, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1262, in full_dispatch_request
    rv = self.dispatch_request()
  File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1248, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 84, in view
    return self.dispatch_request(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 151, in dispatch_request
    return meth(*args, **kwargs)
  File "/x/api/modules/document/document.py", line 14, in get
    return get_resource.apply_async(tag)
  File "/x/venv/lib/python2.7/site-packages/celery/app/task/__init__.py", line 449, in apply_async
    publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 657, in acquire
    R = self.prepare(R)
  File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
    p = p()
  File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
    return lambda: self.create_producer()
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 265, in create_producer
    pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 328, in TaskPublisher
    return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
  File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 158, in __init__
    super(TaskPublisher, self).__init__(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/kombu/compat.py", line 61, in __init__
    super(Publisher, self).__init__(connection, self.exchange, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 79, in __init__
    self.revive(self.channel)
  File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 168, in revive
    channel = channel.default_channel
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 581, in default_channel
    self.connection
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 574, in connection
    self._connection = self._establish_connection()
  File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 533, in _establish_connection
    conn = self.transport.establish_connection()
  File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 279, in establish_connection
    connect_timeout=conninfo.connect_timeout)
  File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 89, in __init__
    super(Connection, self).__init__(*args, **kwargs)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
    raise socket.error, msg
error: [Errno 111] Connection refused

-->

私はredisを使用していますが、rabbitmqをインストールすると別のエラーが発生しますが、今はこれを理解していません。ブローカーはredisである必要がありますが、それが見つからないのでしょうか。誰かが私にここで何が起こっているのかについてもっと手がかりを与えることができますか?他のものをインポートする必要がありますか?要点は、必要最低限​​の例を超えるものはほとんどなく、これは私には意味がありません。

Apiモジュールでは「セロリ」にアクセスできず、アプリレベルでデータを配置しようとすると、そこにあるセロリはいくつかのデフォルトに分類されます。 redisを指しているのでインストールされていません。ただの推測。モジュールに情報をインポートできませんでした。アプリから「celery」(たとえば、output celery.conf)を呼び出すとエラーが発生すると判断しただけですが、celery.taskをインポートすることはできました。

これは、アプリケーションが使用しているブローカー構成であり、例から直接です。

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "localhost"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0

編集:

デモを見たい場合:https ://github.com/thrisp/flask-celery-example

設定にBROKER_TRANSPORT='redis'を含めることは、私が渡すものすべてにとって重要であることが判明したため(ここで示したセットアップとgitの例では)、なぜそうでないのか完全にはわかりません。例のビットにはありませんが、私が追加したものにはありますが、これがないと、すべてをデフォルトのampqキューにダンプする必要があります。

EDIT2:

また、これはかなり大きな問題です。Celeryの次期バージョンを使用すると、Flaskで使用するときに10,000の問題が単純化され、これらすべてが不要になります。

4

1 に答える 1

2

ローカルホストをバインドするようにredisを設定する必要があります。で/etc/redis/redis.conf、次の行のコメントを解除します

bind 127.0.0.1
于 2012-09-12T16:37:54.773 に答える