8

タスクを呼び出し、そのタスクのキューが存在しない場合は作成し、呼び出されたタスクをすぐにそのキューに挿入しようとしています。次のコードがあります。

@task
def greet(name):
    return "Hello %s!" % name


def run():
    result = greet.delay(args=['marc'], queue='greet.1',
        routing_key='greet.1')
    print result.ready()

それから私はカスタムルーターを持っています:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

これにより、交換が呼び出されgreet.1、キューが呼び出されますgreet.1が、キューは空です。というキューのgreetようにルーティング キーをルーティングする方法を知っている交換を呼び出す必要があります。greet.1greet.1

何か案は?

4

1 に答える 1

13

次のことを行う場合:

task.apply_async(queue='foo', routing_key='foobar')

次に、CeleryはCELERY_QUEUESの「foo」キューからデフォルト値を取得します。存在しない場合は、(queue = foo、exchange = foo、routing_key = foo)を使用して自動的に作成します。

したがって、「foo」がCELERY_QUEUESに存在しない場合は、次のようになります。

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

次に、プロデューサーはそのキューを宣言しますが、routing_keyをオーバーライドするため、実際には次を使用してメッセージを送信します。routing_key = 'foobar'

これは奇妙に思えるかもしれませんが、この動作は実際には、さまざまなトピックに公開するトピック交換に役立ちます。

自分でキューを作成して宣言することはできますが、自動メッセージ公開再試行ではうまく機能しません。apply_asyncのキュー引数kombu.Queueが、宣言されて宛先として使用されるカスタムをサポートできるとよいでしょう。たぶん、http: //github.com/celery/celery/issuesでその問題を開くことができます

于 2012-08-16T17:08:20.043 に答える