私は 3 つの状態を持つサガを持っています。初期、受信行数、完了 -
public static State Initial { get; set; }
public static State ReceivingRows { get; set; }
public static State Completed { get; set; }
BofMessage (Bof = ファイルの先頭) を取得すると、Initial から RecomingRows に遷移します。BofMessage の後、多数の RowMessage を受け取ります。それぞれがフラット ファイル内の行を記述しています。すべての RowMessage が送信されると、EofMessage が送信され、状態が Completed に変わります。観察 -
static void DefineSagaBehavior()
{
Initially(When(ReceivedBof)
.Then((saga, message) => saga.BeginFile(message))
.TransitionTo(ReceivingRows));
During(ReceivingRows, When(ReceivedRow)
.Then((saga, message) => saga.AddRow(message)));
During(ReceivingRows, When(ReceivedRowError)
.Then((saga, message) => saga.RowError(message)));
During(ReceivingRows, When(ReceivedEof)
.Then((saga, message) => saga.EndFile(message))
.TransitionTo(Completed));
}
これは機能しますが、BofMessage! の前に複数の RowMessage が受信される場合があります。これは、私が送信した順序とは関係ありません。これは、メッセージが受信され、最終的にエラーとしてカウントされることを意味し、最終的にそれらを書き出すデータベースまたはファイルからメッセージが失われる原因となります。
一時的な修正として、すべての公開を行うこのメソッドにスリープ タイマー ハックを少し追加します。</p>
public static void Publish(
[NotNull] IServiceBus serviceBus,
[NotNull] string publisherName,
Guid correlationId,
[NotNull] Tuple<string, string> inputFileDescriptor,
[NotNull] string outputFileName)
{
// attempt to load offsets
var offsetsResult = OffsetParser.Parse(inputFileDescriptor.Item1);
if (offsetsResult.Result != ParseOffsetsResult.Success)
{
// publish an offsets invalid message
serviceBus.Publish<TErrorMessage>(CombGuid.Generate(), publisherName, inputFileDescriptor.Item2);
return;
}
// publish beginning of file
var fullInputFilePath = Path.GetFullPath(inputFileDescriptor.Item2);
serviceBus.Publish<TBofMessage>(correlationId, publisherName, fullInputFilePath);
// HACK: make sure bof message happens before row messages, or else some row messages won't be received
Thread.Sleep(5000);
// publish rows from feed
var feedResult = FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
foreach (var row in feedResult)
{
// publish row message, unaligned if applicable
if (row.Result != ParseRowResult.Success)
serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName, row.Fields);
else
serviceBus.Publish<TRowMessage>(correlationId, publisherName, row.Fields);
}
// publish end of file
serviceBus.Publish<TEofMessage>(correlationId, publisherName, outputFileName);
}
これは 5 秒のスリープ タイマーであり、非常に醜いハックです。送信した順序でメッセージが届かない理由を誰かに教えてもらえますか? これらのメッセージがデフォルトで順序付けされていない場合、これらのメッセージが正しい順序で送信されるようにすることはできますか?
ありがとうございました!
便宜上、これはhttp://groups.google.com/group/masstransit-discuss/browse_thread/thread/7bd9518a690db4bbからクロスポストされていることに注意してください。