7

サービスが処理する大量の通知があるため、MSMQ バインディングを使用する WCF サービスを作成したいと考えています。クライアントがサービスによって停止されないこと、および通知が発生した順序で処理されることが重要です。したがって、キューの実装です。

もう 1 つの考慮事項は、回復力です。MSMQ 自体をクラスター化してキューをより堅牢にすることができることはわかっていますが、別のサーバーでサービスのインスタンスを実行できるようにしたいので、サーバーがクラッシュした場合に通知がキューに蓄積されず、別のサーバーが処理を続行します。 .

MSMQ バインディングを試してみたところ、サービスの複数のインスタンスが同じキューでリッスンしている可能性があり、そのままにしておくと、利用可能なサービス全体に負荷が分散された一種のラウンド ロビンを実行することになります。これは素晴らしいことですが、インスタンスが異なればリクエストの処理にかかる時間が異なるため、キューの順序付けが失われてしまいます。

私は単純なコンソール アプリを使って実験を行ってきました。これは以下の壮大なコード ダンプです。実行すると、次のような出力が得られます。

host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed

私がしたいことは次のとおりです。

host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.

S2 が完了していないため、まだ失敗して、処理中のメッセージをキューに返す可能性があると考えていました。したがって、S1 が別のメッセージをキューから取り出すことを許可してはなりません。私のキューはトランザクションでありTransactionScopeRequired = true、サービスを設定しようとしましたが、役に立ちませんでした。

これは可能ですか?私はそれについて間違った方法で進んでいますか?何らかの中央同期メカニズムなしでフェイルオーバー サービスを構築する他の方法はありますか?

class WcfMsmqProgram
{
    private const string QueueName = "testq1";

    static void Main()
    {
        // Create a transactional queue
        string qPath = ".\\private$\\" + QueueName;
        if (!MessageQueue.Exists(qPath))
            MessageQueue.Create(qPath, true);
        else
            new MessageQueue(qPath).Purge();

        // S1 processes as fast as it can
        IService s1 = new ServiceImpl("S1");
        // S2 is slow
        IService s2 = new ServiceImpl("S2", 2000);

        // MSMQ binding
        NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);

        // Host S1
        ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
        ConfigureService(host1, binding);
        host1.Open();
        Console.WriteLine("host1 open");

        // Host S2
        ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
        ConfigureService(host2, binding);
        host2.Open();
        Console.WriteLine("host2 open");

        // Create a client 
        ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
        IService client = factory.CreateChannel();

        // Periodically call the service with a new number
        int counter = 1;
        using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
        {
            // Enter to stop
            Console.ReadLine();
        }

        host1.Close();
        Console.WriteLine("host1 closed");
        host2.Close();
        Console.WriteLine("host2 closed");

        // Wait for exit
        Console.ReadLine();
    }

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
    {
        var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
    }

    [ServiceContract]
    interface IService
    {
        [OperationContract(IsOneWay = true)]
        void EchoNumber(int number);
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    class ServiceImpl : IService
    {
        public ServiceImpl(string name, int sleep = 0)
        {
            this.name = name;
            this.sleep = sleep;
        }

        private string name;
        private int sleep;

        public void EchoNumber(int number)
        {
            Thread.Sleep(this.sleep);
            Console.WriteLine("{0}: {1:00}", this.name, number);
        }
    }
}
4

2 に答える 2

10

バトワード、

サービス バスを手動で作成しようとしています。既存のものを使ってみませんか?

NServiceBus、MassTransit、ServiceStack

それらのうち少なくとも 2 つは MSMQ で動作します。

さらに、絶対に順序付けが必要な場合は、実際には別の理由である可能性があります-メッセージを送信できるようにしたいが、最初のメッセージの前に依存メッセージを処理したくない場合。あなたはサガパターンを探しています。NServiceBus と MassTransit の両方を使用すると、Sagas を簡単に管理できます。どちらも、単純に最初のメッセージをトリガーしてから、条件に基づいて残りのメッセージをトリガーすることができます。これにより、分散アプリケーションのプランピングを簡単に実装できます。

その後、何千ものクライアント、キュー サーバー、およびメッセージ プロセッサにスケールアップすることもできます。コードを 1 行も書く必要はなく、問題も発生しません。

ここで msmq を介して独自のサービス バスを実装しようとしましたが、別の問題が忍び寄り続けたため断念しました。私たちは NServiceBus を選びましたが、MassTransit も優れた製品です (100% オープン ソースですが、NServiceBus はそうではありません)。ServiceStack は、API の作成とメッセージ キューの使用に優れています。これを使用して、キューのフロントエンドとして機能するサービスを数分で作成できると確信しています。

ああ、NSB と MT の場合、キュー、センダー、ハンドラーを完全に実装するのに 10 行未満のコードしか必要ないことは言及しましたか?

- - - 追加した - - -

Udi Dahan (NServiceBus の主な貢献者の 1 人) は、これについて次のように語ってい ます。ウディ・ダハンと

Chris Patterson (Mass Transit の主な貢献者の 1 人 )

StackOverflow の質問/回答: 「WCF アプリケーションで MSMQ メッセージを使用するときにメッセージの順序を保持する」

- - - 質問 - - -

メッセージの順序を保証する必要がある理由について、私は困惑していると言わざるを得ません。HTTP/SOAP プロトコルを使用している場合、同じ立場になりますか? 私の推測ではノーだと思いますが、なぜ MSMQ で問題になるのでしょうか?

頑張ってください、これが役に立てば幸いです、

于 2012-12-13T18:29:59.007 に答える
1

メッセージの順序どおりの配信を保証することは、大量のメッセージングに関する事実上の厄介な問題の 1 つです。

理想的な世界では、メッセージの宛先は順不同のメッセージングを処理できる必要があります。これは、メッセージ ソースに何らかの順序付け情報が含まれていることを確認することで実現できます。ここでも理想的には、これはある種の x-of-n バッチ スタンプの形式をとります (メッセージ 1/10、2/10 など)。メッセージの送信先は、配信されたデータを順番に組み立てる必要があります。

ただし、現実の世界では、通常、ダウンストリーム システムを変更して、順不同で到着するメッセージを処理する余地はありません。この場合、次の 2 つの選択肢があります。

  1. 完全にシングル スレッド化します。実際には、通常、何らかの「グループ化 ID」を見つけることができます。これは、グループごとの意味でシングル スレッド化できることを意味します。つまり、異なるメッセージ グループ間で同時実行性が維持されます。
  2. 順序どおりにメッセージを受信するコンシューマ システムごとにリシーケンサラッパーを実装します。

どちらのソリューションも優れたものではありませんが、それが同時実行性と順序どおりのメッセージ配信を実現できる唯一の方法だと思います。

于 2012-12-10T14:25:57.880 に答える