4

分散トランザクションがタイムアウトする前にリクエストの処理が終了しない場合、元のメッセージの処理が完了する前、または元のメッセージが処理される前に、同じメッセージが新しいスレッドで再処理される場合、実行時間の長いリクエストを持つメッセージ ハンドラーの問題に気付きました。エラーをスローします。メッセージは最終的にエラー キューに到達します。

問題を再現した方法は次のとおりです。

ローカル マシンの構成を更新して、トランザクションのデフォルト タイムアウトを 1 分に設定しました。

<system.transactions>        
   <defaultSettings distributedTransactionManagerName="" timeout="00:01:00" />       
    <machineSettings maxTimeout="00:01:00" />
</system.transactions>

NSB メッセージ サービスをマルチスレッドに設定しました。

<MsmqTransportConfig NumberOfWorkerThreads="2" MaxRetries="2" />

次に、ハンドラーでスレッドを 2 分間スリープさせました (以下のコード)。メッセージがワーカー トレッド A に届きます。1 分間のタイムアウトが発生すると、同じメッセージがワーカー スレッド B で処理され、ワー​​カー スレッド A は引き続き動作します。睡眠。ワーカー スレッド A が最終的に戻ってきて、「トランザクションに参加できません」というエラーが表示されます。その後、メッセージの処理を再開します。これは、メッセージが最終的にエラー キューに入るまで、両方のワーカー スレッドで続行されます。

public void Handle(RequestDataMessage message)
    {
        Logger.Info("==========================================================================");
        Logger.InfoFormat("Received request {0}.", message.DataId);
        Logger.InfoFormat("String received: {0}.", message.String);
        Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
        Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);

        var response = Bus.CreateInstance<DataResponseMessage>(m => 
        { 
            m.DataId = message.DataId;
            m.String = message.String;
        });

        response.CopyHeaderFromRequest("Test");
        response.SetHeader("1", "1");
        response.SetHeader("2", "2");

        Thread.Sleep(new TimeSpan(0, 2, 0));



        Logger.Info("========== Thread continued ===========");
        Logger.InfoFormat("Received request {0}.", message.DataId);
        Logger.InfoFormat("String received: {0}.", message.String);
        Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
        Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);

        Bus.Reply(response); //Try experimenting with sending multiple responses
    }

これは既知の問題ですか、それとも、発生するこの種の集中的な再処理を回避するためにコードを設計する方法はありますか?

4

2 に答える 2

4

一般に、トランザクションをそれほど長く保留することはお勧めできません。これは、そのトランザクションに参加しているすべてのリソースがロックされていることを意味します。ハンドラーで行われている作業を分割するようにしてください。ハンドラーが処理する作業の種類は? その仕事を分割することは可能ですか?

他のスレッドの問題について: スレッド A がスリープしている間にスレッド B が同じメッセージを処理している場合、バージョン 3.3.8 またはバージョン 4.0.2 でその動作が見られません。

ここに私のハンドラがあります:

    public void Handle(ProcessSomething message)
    {
        Console.WriteLine("{0} -- Thread {1} Handling MessageId {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, Bus.CurrentMessageContext.Id);

        var response = Bus.CreateInstance<DataResponseMessage>(m =>
        {
            m.DataId = message.DataId;
            m.String = message.String;
        });

        Thread.Sleep(new TimeSpan(0, 1, 10));
        Console.WriteLine("{0} -- Thread {1} contiuned after sleep MessageId {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, Bus.CurrentMessageContext.Id);

        Bus.Reply(response); //Try experimenting with sending multiple responses
    }

4 つのメッセージをキューに入れ、スレッド数 / MaxConcurrency を 2 に設定し、machine.config の maxtimeout を 1 分に設定し、ハンドラーを 1 分 10 秒間スリープするように設定しました。

Bus.Reply は、トランザクションへの参加に失敗します。トランザクションのタイムアウトのため。これにより例外が発生し、メッセージが再試行されます。同じハンドラー内で送信されるすべてのメッセージは、同じトランザクションの一部です。したがって、このメッセージは失敗し、再試行されます。私が見ているのは、スレッド 'A' が Msmq メッセージ ID 'm1' を処理し、スリープ状態で同じメッセージ 'm1' を続行し、応答しようとするとスローすることです。スレッド「A」がスリープしている間にスレッド「B」が「m1」を取得していることはわかりません。

乾杯。

于 2013-08-09T18:23:46.830 に答える
1

トランザクションのタイムアウトに関する同様の問題がありましたが、すべてのトランザクションのデフォルトのタイムアウトを変更したくないため、NServiceBus のトランザクション構成を変更するだけになりました。

Configure.Transactions.Advanced(x => x.DefaultTimeout(TimeSpan.FromMinutes(3)));

ただし、これは理想的ではなく、よりモジュール化された作業に分割することをお勧めします。

于 2014-10-01T11:08:31.377 に答える