Oracle AQ で Reactive 拡張機能を使用しようとしています。メッセージがOracle Queueに到着すると、「OracleAQMessageAvailableEvent」が起動され、メッセージがあることがコンシューマに通知されます。OracleAQMessageAvailableEventHandler 内で、コンシューマは OracleAQQueue.Dequeue() をコールしてメッセージを取得します。
RXで上記の作業を行いました。以下は私が使用したコードです。
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
問題は、すべてが機能したらメッセージをサブスクライブすると、メッセージを複数回サブスクライブすると (つまり、アプリケーション内の複数のコンシューマー)、すべてのコンシューマーが「_queue.Dequeue()」を呼び出そうとし、最初の呼び出し以降のすべての呼び出しが失敗することです。新しいメッセージがない場合。
誰か私が何をすべきか教えてください。私のシナリオは Hot Observable 向けだと思いますが、頭を悩ませています。