3

LocalStack サーバーに SQS キューがあり、そこからのメッセージを Celery コンシューマーで消費しようとしています。

コンシューマが queue などのキューに適切に接続されていることは継ぎ目ですが、コマンドsqs-test-queueでメッセージを送信しようとしてもメッセージを受信しません。aws

celeryconfig.pyはこのように見えます:

from kombu import (
    Exchange,
    Queue
)


broker_transport_options = {'region': REGION}
broker_transport = 'sqs'

accept_content = ['application/json']
result_serializer = 'json'
content_encoding = 'utf-8'
task_serializer = 'json'

worker_enable_remote_control = False
worker_send_task_events = False
result_backend = None

task_queues = (
    Queue('sqs-test-queue', exchange=Exchange(''), routing_key='sqs-test-queue'),
)

私のtasks.pyモジュールは次のようになります。

from celery import Celery
from kombu.utils.url import quote


AWS_ACCESS_KEY = quote("AWS_ACCESS_KEY")
AWS_SECRET_KEY = quote("AWS_SECRET_KEY")
LOCALSTACK = "<IP>:<PORT>"

broker_url = "sqs://{access}:{secret}@{host}".format(access=AWS_ACCESS_KEY,
                                                     secret=AWS_SECRET_KEY,
                                                     host=LOCALSTACK)

app = Celery('tasks', broker=broker_url, backend=None)
app.config_from_object('celeryconfig')


@app.task(bind=True, name='tasks.consume', acks_late=True, ignore_result=True)
def consume(self, msg):
    # DO SOMETHING WITH THE RECEIVED MESSAGE
    return True

それを実行しようとしましたがcelery -A tasks worker -l INFO -Q sqs-test-queue、すべてがうまくいきました:

...

[tasks]
  . tasks.consume

[... INFO/MainProcess] Connected to sqs://AWS_ACCESS_KEY:**@<IP>:<PORT>// 
[... INFO/MainProcess] celery@local ready

でメッセージを送信しようとしてaws sqs send-message --endpoint-url=http://<IP>:<PORT> --queue-url=http://localhost:<PORT>/queue/sqs-test-queue --message-body="Test message"も、何も起こりません。

私は何を間違っていますか?多分設定で何かを見逃しましたか?

PS:コマンドを実行しようとするとaws sqs receive-message --endpoint-url=http://<IP>:<PORT> --queue-url=http://localhost:<PORT>/queue/sqs-test-queue、メッセージを取得できます。

ノート:

私は使用Python 3.7.0していますが、私のpip freeze見た目は次のようになります。

boto3==1.10.16
botocore==1.13.16
celery==4.3.0
kombu==4.6.6
pycurl==7.43.0.3
...
4

2 に答える 2