を使用してメッセージをキューに直接送信できることを理解しています。channel.sendToQueue
これにより、1 つのコンシューマーのみが各タスクを処理する、タスクとワーカーの状況が作成されます。
また、トピックベースの交換で使用できることも理解していますchannel.publish
。メッセージは、ルーティング キーに基づいてキューにルーティングされます。ただし、私の理解では、これは一致するキューのすべてのサブスクライバーに常にブロードキャストされます。
基本的にトピックベースの交換を使用したいのですが、各タスクを処理するコンシューマは 1 つだけです。ドキュメントを確認しましたが、これを行う方法がわかりません。
私のユースケース:
複数の場所にマイクロサービスのインスタンスをセットアップしています。カリフォルニアに 2 つ、ロンドンに 3 つ、シンガポールに 1 つなどです。タスクが作成されるとき、重要なのは、特定の場所にあるインスタンスの 1 つによって処理されることだけです。
もちろん、「usa-90210」、「uk-ec1a」などの名前の何百ものキューを作成できます。トピックを使用すると、はるかにクリーンになるように思えます。また、ワイルドカードを使用できることを考えると、より柔軟になります。
これが RabbitMQ の機能でない場合は、他の考えやアイデアも受け入れます。
アップデート
istepaniuk の提案に従って、2 つのワーカーを作成してみました。それぞれが独自のキューを交換にバインドします。
const connectionA = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelA = await connectionA.createChannel();
channelA.assertExchange('test_exchange', 'topic', { durable: false });
await channelA.assertQueue('test_queue_a', { exclusive: true });
channelA.bindQueue('test_queue_a', 'test_exchange', 'usa.*');
await channelA.consume('test_queue_a', () => { console.log('worker a'); }, { noAck: true });
const connectionB = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelB = await connectionB.createChannel();
channelB.assertExchange('test_exchange', 'topic', { durable: false });
await channelB.assertQueue('test_queue_b', { exclusive: true });
channelB.bindQueue('test_queue_b', 'test_exchange', 'usa.*');
await channelB.consume('test_queue_b', () => { console.log('worker b'); }, { noAck: true });
const pubConnection = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const pubChannel = await pubConnection.createChannel();
pubChannel.assertExchange('test_exchange', 'topic', { durable: false });
pubChannel.publish('test_exchange', 'usa.90210', Buffer.from(''));
残念ながら、両方のコンシューマーはまだメッセージを受信しています。
worker a
worker b