1

Oracle AQ で Reactive 拡張機能を使用しようとしています。メッセージがOracle Queueに到着すると、「OracleAQMessageAvailableEvent」が起動され、メッセージがあることがコンシューマに通知されます。OracleAQMessageAvailableEventHandler 内で、コンシューマは OracleAQQueue.Dequeue() をコールしてメッセージを取得します。

RXで上記の作業を行いました。以下は私が使用したコードです。

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
                    h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
                .Where(x => x.EventArgs.AvailableMessages > 0)
                .Select(x =>
                {
                    OracleAQMessage msg = _queue.Dequeue();
                    return (UpdateMsg) msg.Payload;
                });
messages.subscribe(....)

問題は、すべてが機能したらメッセージをサブスクライブすると、メッセージを複数回サブスクライブすると (つまり、アプリケーション内の複数のコンシューマー)、すべてのコンシューマーが「_queue.Dequeue()」を呼び出そうとし、最初の呼び出し以降のすべての呼び出しが失敗することです。新しいメッセージがない場合。

誰か私が何をすべきか教えてください。私のシナリオは Hot Observable 向けだと思いますが、頭を悩ませています。

4

2 に答える 2

-1

リー・キャンベル、ごめんなさい。あなたが言及した解決策は機能します。実は使い方が間違っていました。Messages というプロパティを持つクラス呼び出し QueueWrapper があります。このメッセージの実装がありました

    public IObservable<UpdateMsg> Messages { 
        get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,         OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
       .Where(x => x.EventArgs.AvailableMessages > 0)
       .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
       .Publish()
       .Refcount();
}}

私のクライアントコードは、このように Messages プロパティを使用してサブスクライブしていました

// First Subscription
_queueWrapper.Messages.Subscribe(....)

// Second Subscription
_queueWrapper.Messages.Subscribe(....)

そのため、サブスクリプションごとに、Messages プロパティが新しい IObservable を返していました。これを修正するために、observable の初期化を QueueWrapper のコンストラクター、つまり次のコードに移動しました。

    public QueueWrapper() {
     _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
    .Where(x => x.EventArgs.AvailableMessages > 0)
    .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
    .Publish()
    .Refcount();
}

私の Messages プロパティは _messages を返すだけです。

public IObservable<UpdateMsg> Messages { get { return _messages; } }

その後、すべてが期待どおりに機能し始めました。

于 2016-05-31T12:59:52.527 に答える