1

ファイル システム上の IBM MQ とフォルダーを利用するメッセージ ブローカーを作成しています。メッセージを取得した後、それらを厳密に型指定されたクラスに具体化し、RX サブジェクトにプラグインします。

メッセージを処理するためにどの外部システムを攻撃する必要があるかを特定できるように、メッセージに対する認識を構築したので、RX オブザーバブルに対してクエリを実行し、外部システムを対象としないメッセージを選択することができます。

次にやりたいことは、ヒットした外部システムによるメッセージの抑制です。たとえば、次のようになります。

特定のタイプのメッセージで CRM システムをヒットしていて、最大 4 つの同時呼び出しでそのシステムをヒットしたいと判断した場合、一度に 4 つのメッセージしか処理しません。5 番目のメッセージがあった場合は、前の 4 つのうちの 1 つが完了するのを待ってから、5 つ目に進む必要があります。外部データベース、他の外部 Web サービスなどの他のタイプのリソースについても同じです。

私はこの問題について調査を開始しましたが、これまでのところ、最良の設計アプローチは独自のスケジューラを作成することです。欠点は、メッセージが取得された後、スケジューラー内でメッセージをキューに入れる独自の内部構造を作成する必要があることです。これが、私がこのアプローチを嫌います。

誰かがこれを行うためのより良い方法を持っていますか?

4

3 に答える 3

1

あなたが説明しているのは最大同時実行数のようです。Mergeオペレーターはそのようなものをサポートしています。

GroupByストリームの行き先に基づいてストリームを分割Mergeし、各分割部分で最大の同時実行性を使用して、最後Mergeに結果を元に戻すようなものを使用する必要があります。このようなもの:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);
于 2013-08-17T22:06:57.843 に答える
0

サービス リクエストのレート調整メカニズムを含む ReactiveUI を調べることをお勧めします。http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/を参照してください。

于 2013-08-16T14:42:26.200 に答える