5

最近、 pub / subを実装する方法として、C#でRabbitMQをチェックしています。私はNServiceBusでの作業に慣れています。NServiceBusは、MSMQをに参加させることによってトランザクションを処理しTransactionScopeます。他のトランザクション対応操作も同じものに参加できるTransactionScopeため(MSSQLなど)、すべてが真にアトミックです。その下で、NSBは調整のためにMSDTCを導入します。

RabbitMQのC#クライアントAPIにはとがありIModel.TxSelect()ますIModel.TxCommit()。これは、コミット前に取引所にメッセージを送信しないようにうまく機能します。これは、アトミックである必要がある交換に送信される複数のメッセージがあるユースケースをカバーしています。ただし、データベース呼び出し(MSSQLなど)をRabbitMQトランザクションと同期する良い方法はありますか?

4

3 に答える 3

16

IEnlistmentNotificationインターフェイスを実装することにより、MSDTCで使用されるRabbitMQリソースマネージャーを作成できます。実装は、参加を求めるときに、トランザクションマネージャーに2フェーズコミット通知コールバックを提供します。MSDTCには高額な費用がかかり、全体的なパフォーマンスが大幅に低下することに注意してください。

RabbitMQリソースマネージャーの例:

sealed class RabbitMqResourceManager : IEnlistmentNotification
{
    private readonly IModel _channel;

    public RabbitMqResourceManager(IModel channel, Transaction transaction)
    {
        _channel = channel;
        _channel.TxSelect();
        transaction.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public RabbitMqResourceManager(IModel channel)
    {
        _channel = channel;
        _channel.TxSelect();
        if (Transaction.Current != null)
            Transaction.Current.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public void Commit(Enlistment enlistment)
    {
        _channel.TxCommit();
        enlistment.Done();
    }

    public void InDoubt(Enlistment enlistment)
    {           
        Rollback(enlistment);
    }

    public void Prepare(PreparingEnlistment preparingEnlistment)
    {
        preparingEnlistment.Prepared();
    }

    public void Rollback(Enlistment enlistment)
    {
        _channel.TxRollback();
        enlistment.Done();
    }
}

リソースマネージャーの使用例

using(TransactionScope trx= new TransactionScope())
{
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.DeliveryMode = 2;

    new RabbitMqResourceManager(_channel, trx);
    _channel.BasicPublish(someExchange, someQueueName, basicProperties, someData);
    trx.Complete();
}
于 2013-10-30T15:33:40.687 に答える
5

私の知る限り、TxSelect/TxCommitをTransactionScopeと調整する方法はありません。

現在私が採用しているアプローチは、永続的なメッセージを含む永続的なキューを使用して、RabbitMQの再起動後も存続できるようにすることです。次に、キューから消費するときに、メッセージを読み取って処理を行い、データベースにレコードを挿入します。これがすべて完了すると、メッセージをACK(nowledge)して、キューから削除します。このアプローチの潜在的な問題は、メッセージが2回処理される可能性があることです(たとえば、メッセージがDBにコミットされているが、メッセージが確認される前にRabbitMQへの接続が切断されていると言った場合)。スループットを懸念して構築しています。(これは「少なくとも1回」のアプローチと呼ばれると思います)。

RabbitMQサイトでは、TxSelectとTxCommitを使用するとパフォーマンスが大幅に低下すると言われているため、両方のアプローチのベンチマークを行うことをお勧めします。

どのように行うにしても、消費者が2回処理される可能性のあるメッセージに対処できるようにする必要があります。


まだ見つけていない場合は、ここでRabbitMQの.Netユーザーガイド、特にセクション3.5を参照してください。

于 2012-08-04T18:47:14.417 に答える
0

抽象化IServiceBus用のサービスバス実装があるとしましょう。ボンネットの下にあるrabbitmqのふりをすることはできますが、そうである必要はありません。

servicebus.Publishを呼び出すと、System.Transaction.Currentをチェックして、トランザクションを実行しているかどうかを確認できます。あなたがそうしていて、それがmssqlサーバー接続のトランザクションである場合、rabbitに公開する代わりに、実行しているデータベース操作でコミット/ロールバックを尊重するsqlサーバー内のブローカーキューに公開できます(何らかの接続を行いたいブローカーがtxnをmsdtcにアップグレードすることを公開しないようにするための魔法)

次に、ブローカーキューを読み取り、実際にウサギに公開する必要があるサービスを作成する必要があります。このようにして、非常に重要なことについて、データベース操作が以前に完了し、メッセージがいつかウサギに公開されることを保証できます。将来(サービスがそれを中継するとき)。ブローカーが例外を受信することをコミットするときに例外が発生した場合でも、ここで失敗する可能性はありますが、問題のウィンドウが大幅に減少し、最悪の場合、複数回公開することになり、メッセージが失われることはありません。これは非常にありそうにありません。SQLサーバーが受信後、コミット前にオフラインになるのは、最終的に最小の二重公開になる場合の例です(サーバーがオンラインになると、再度公開します)。一部を軽減し、

于 2015-05-28T18:03:22.467 に答える