Find centralized, trusted content and collaborate around the technologies you use most.
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
メッセージをキューに生成し、メッセージがアプリケーションで正しく消費および処理されるかどうかを確認するいくつかのテストを作成しようとしています。
そのために、kombu ライブラリ、特にメモリ内トランスポートの実装をいじっています。
それでも、生成されたメッセージが消費されるということは機能しません。
したがって、私の質問は、メッセージをメモリ内で生成および消費する簡単な単体テストを誰かが提供できるかどうかです
これをテストしようとしているコードに適合させたいと思うでしょうが、探している基本的なものは、「memory://」であるメモリ内コントローラの amqp URI です。本当に簡単な例として:
conn = kombu.Connection("memory://") queue = conn.SimpleQueue('myqueue') queue.put('test') msg = queue.get(timeout=1) msg.ack() print(msg.payload)