簡単にできるはずのことをやろうとしていますが、驚くほど難しいことがわかっています。
RabbitMQ キューにサブスクライブする機能があります。具体的には、これは Channel.consume 関数です: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
サブスクリプション ID で解決される promise を返します。これは、後でサブスクリプションを解除するために必要です。また、メッセージがキューから取り出されたときに呼び出すコールバック引数もあります。
キューから登録解除したい場合は、次の Channel.cancel 関数を使用してコンシューマーをキャンセルする必要があります: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。これは、以前に返されたサブスクリプション ID を取ります。
オブザーバブルがサブスクライブされたときにキューにサブスクライブし、オブザーバブルがサブスクライブ解除されたときにサブスクリプションをキャンセルするオブザーバブルにこれらすべてのものをラップしたいと思います。ただし、呼び出しの「二重非同期」の性質のために、これはやや難しいことが証明されています (つまり、コールバックとプロミスの両方があるということです)。
理想的には、私が書きたいコードは次のとおりです。
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});
ただし、このコンストラクターは非同期サブスクライバー関数または破棄ロジックをサポートしていないため、これは不可能です。
私はこれを理解することができませんでした。ここで何か不足していますか?なぜこれが難しいのですか?
乾杯、アレックス