3

私は次のようなハンドラーを持っています。これは基本的にコマンドに応答し、一連のコマンド全体を別のキューに送信します。

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            var command = Bus.CreateInstance<IAnotherCommand>();
            command.Id = i;
            Bus.Send("target.queue@d1555", command);
            i++;
        }
    }

このブロックの問題は、ループが完全に完了するまで、ターゲットキューまたは送信キューにメッセージが表示されないことです。誰かが私がこの行動を理解するのを手伝ってもらえますか?

また、以下のようにタスクを使用してハンドラー内でメッセージを送信すると、メッセージがすぐに表示されます。それで、これに関する2つの質問、

  1. すぐに実行するタスクベースの送信の説明は何ですか?
  2. メッセージハンドラーでタスクを使用することで何か影響はありますか?

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            System.Threading.ThreadPool.QueueUserWorkItem((args) =>
            {
                var command = Bus.CreateInstance<IAnotherCommand>();
                command.Id = i;
                Bus.Send("target.queue@d1555", command);
                i++;
            });
        }
    }
    

どうぞよろしくお願いいたします。

4

3 に答える 3

4

最初 の質問: キューからメッセージを選択し、登録されているすべてのメッセージ ハンドラーを実行し、その他のトランザクション アクション (新しいメッセージの書き込みやデータベースへの書き込みなど) を 1 つのトランザクションで実行します。すべて完了するか、まったく完了しないかのどちらかです。つまり、キューからメッセージを選択し、ISomeCommand を処理し、10000 個の新しい IAnotherCommand を書き込むことは、完全に行われるか、まったく行われないかのいずれかです。この動作を回避するには、次のいずれかを実行できます。

  1. NServiceBus エンドポイントをトランザクションに対応しないように構成する

    public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher,IWantCustomInitialization
    {
        public void Init()
        {
            Configure.With()
                .DefaultBuilder()
                .XmlSerializer()
                .MsmqTransport()
                .IsTransactional(false)
                .UnicastBus();
        }
    }
    
  2. アンビエント トランザクションを抑制するトランザクション スコープで IAnotherCommand の送信をラップします。

    public void Handle(ISomeCommand message)
    { 
        using (new TransactionScope(TransactionScopeOption.Suppress)) 
        { 
            int i=0; 
            while (i < 10000) 
            { 
                var command = Bus.CreateInstance(); 
                command.Id = i; 
                Bus.Send("target.queue@d1555", command); 
                i++; 
            } 
        } 
    } 
    
  3. System.Threading.ThreadPool.QueueUserWorkItem または Task クラスを使用して、自分で新しいスレッドを開始することにより、別のスレッドで Bus.Send を発行します。これが機能するのは、アンビエント トランザクションが新しいスレッドに自動的に引き継がれないためです。

2 番目の質問:タスクまたは前述の他の方法を使用することの影響は、全体に対するトランザクションの検疫がないことです。

5000個のIAnotherMessageを発生させて突然電源が落ちた場合の対処法は?

2) または 3) を使用すると、元の ISomeMessage は完了せず、エンドポイントを再起動したときに NServiceBus によって自動的に再試行されます。最終結果: 5000 + 10000 IAnotherCommands。

1) を使用すると、IAnotherMessage が完全に失われ、5000 個の IAnotherCommands だけになります。

推奨されるトランザクション方法を使用すると、最初の 5000 個の IAnotherCommands が破棄され、元の ISomeMessage がキューに戻ってきて、エンドポイントが再起動したときに再試行されます。最終結果: 10000 IAnotherCommands。

于 2012-11-15T15:13:36.677 に答える
0

メモリがサービスを提供する場合、NServiceBusはTransactionScope、トランザクションオプションが使用されており、TransactionScopeクロススレッド対応にするために何らかの支援が必要な場合に、メッセージハンドラーへの呼び出しをラップします。

TransactionScopeとマルチスレッド

于 2012-11-15T04:18:35.830 に答える
0

オーバーヘッドを削減しようとしている場合は、メッセージをバンドルすることもできます。送信の署名は Bus.Send(IMessage[]messages) です。MSMQ のサイズ制限を超えないことを保証できる場合は、一度にすべてのメッセージを Send() できます。サイズ制限が問題になる場合は、それらをチャンクアップするか、Databus を使用できます。

于 2012-11-19T20:20:49.883 に答える