2

私は 3 つの状態を持つサガを持っています。初期、受信行数、完了 -

    public static State Initial { get; set; }
    public static State ReceivingRows { get; set; }
    public static State Completed { get; set; }

BofMessage (Bof = ファイルの先頭) を取得すると、Initial から RecomingRows に遷移します。BofMessage の後、多数の RowMessage を受け取ります。それぞれがフラット ファイル内の行を記述しています。すべての RowMessage が送信されると、EofMessage が送信され、状態が Completed に変わります。観察 -

    static void DefineSagaBehavior()
    {
        Initially(When(ReceivedBof)
            .Then((saga, message) => saga.BeginFile(message))
            .TransitionTo(ReceivingRows));

        During(ReceivingRows, When(ReceivedRow)
            .Then((saga, message) => saga.AddRow(message)));

        During(ReceivingRows, When(ReceivedRowError)
            .Then((saga, message) => saga.RowError(message)));

        During(ReceivingRows, When(ReceivedEof)
            .Then((saga, message) => saga.EndFile(message))
            .TransitionTo(Completed));
    }

これは機能しますが、BofMessage! の前に複数の RowMessage が受信される場合があります。これは、私が送信した順序とは関係ありません。これは、メッセージが受信され、最終的にエラーとしてカウントされることを意味し、最終的にそれらを書き出すデータベースまたはファイルからメッセージが失われる原因となります。

一時的な修正として、すべての公開を行うこのメソッドにスリープ タイマー ハックを少し追加します。</p>

    public static void Publish(
        [NotNull] IServiceBus serviceBus,
        [NotNull] string publisherName,
        Guid correlationId,
        [NotNull] Tuple<string, string> inputFileDescriptor,
        [NotNull] string outputFileName)
    {
        // attempt to load offsets
        var offsetsResult = OffsetParser.Parse(inputFileDescriptor.Item1);
        if (offsetsResult.Result != ParseOffsetsResult.Success)
        {
            // publish an offsets invalid message
            serviceBus.Publish<TErrorMessage>(CombGuid.Generate(), publisherName, inputFileDescriptor.Item2);
            return;
        }

        // publish beginning of file
        var fullInputFilePath = Path.GetFullPath(inputFileDescriptor.Item2);
        serviceBus.Publish<TBofMessage>(correlationId, publisherName, fullInputFilePath);

        // HACK: make sure bof message happens before row messages, or else some row messages won't be received
        Thread.Sleep(5000);

        // publish rows from feed
        var feedResult = FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
        foreach (var row in feedResult)
        {
            // publish row message, unaligned if applicable
            if (row.Result != ParseRowResult.Success)
                serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName, row.Fields);
            else
                serviceBus.Publish<TRowMessage>(correlationId, publisherName, row.Fields);
        }

        // publish end of file
        serviceBus.Publish<TEofMessage>(correlationId, publisherName, outputFileName);
    }

これは 5 秒のスリープ タイマーであり、非常に醜いハックです。送信した順序でメッセージが届かない理由を誰かに教えてもらえますか? これらのメッセージがデフォルトで順序付けされていない場合、これらのメッセージが正しい順序で送信されるようにすることはできますか?

ありがとうございました!

便宜上、これはhttp://groups.google.com/group/masstransit-discuss/browse_thread/thread/7bd9518a690db4bbからクロスポストされていることに注意してください。

4

1 に答える 1

6

メッセージが任意の順序で配信されることを保証することはできません。コンシューマー側に同時コンシューマーが1つしかないことを確認することで、MTに近づくことができます。それでも、この動作には依存しません(http://docs.masstransit-project.com/en/latest/overview/keyideas.html#ハンドラー)。これにより、消費者は効果的にシングルスレッドになります。

于 2012-05-23T11:26:36.107 に答える