1

メッセージをキューに生成し、メッセージがアプリケーションで正しく消費および処理されるかどうかを確認するいくつかのテストを作成しようとしています。

そのために、kombu ライブラリ、特にメモリ内トランスポートの実装をいじっています。

それでも、生成されたメッセージが消費されるということは機能しません。

したがって、私の質問は、メッセージをメモリ内で生成および消費する簡単な単体テストを誰かが提供できるかどうかです

4

1 に答える 1

3

これをテストしようとしているコードに適合させたいと思うでしょうが、探している基本的なものは、「memory://」であるメモリ内コントローラの amqp URI です。本当に簡単な例として:

conn = kombu.Connection("memory://")
queue = conn.SimpleQueue('myqueue')
queue.put('test')
msg = queue.get(timeout=1)
msg.ack()
print(msg.payload)
于 2016-07-21T22:02:31.627 に答える