1

誰かが以前にこれに遭遇したことがあるかどうか疑問に思います:

コマンドを処理し、ハンドラーでイベントをイベントストア(joliver)に保存します。
ディスパッチ直後に、同じコマンドのハンドラーが再度処理されます。
コマンドのGUIDが同じであるため、同じコマンドを知っています。

5回試行した後、nservicebusは、最大再試行のためにコマンドが失敗したと言います。
したがって、明らかにコマンドは失敗しましたが、何が失敗したのかはわかりません。ディスパッチャの内容をtrycatchに入れましたが、エラーはありません。コードがディスパッチャを終了した後、イベントハンドラは、何かエラーが発生したかのように常に起動します。

コードをトレースすると、イベントがデータベースに保存され(行が表示されます)、ディスパッチャーが実行され、[ディスパッチ]列がtrueに設定されます。その後、ハンドラーがコマンドを再度処理し、プロセスが繰り返され、別の行が挿入されます。 commitsテーブルに追加します。

何が失敗しているのでしょうか?イベントストアのどこかに成功フラグを設定していませんか?イベントストアをnServicebusから切り離すと、両方とも期待どおりに実行され、再試行や失敗は発生しません。

ディスパッチャ:

    public void Dispatch(Commit commit)
    {
        for (var i = 0; i < commit.Events.Count; i++)
        {
            try
            {
                var eventMessage = commit.Events[i];
                var busMessage = (T)eventMessage.Body;
                //bus.Publish(busMessage);

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

Wireup.Init()

    private static IStoreEvents WireupEventStore()
    {
        return Wireup.Init()
            .LogToOutputWindow()
            .UsingSqlPersistence("EventStore")
            .InitializeStorageEngine()
            .UsingBinarySerialization()
            //.UsingJsonSerialization()
            //    .Compress()
            //.UsingAsynchronousDispatchScheduler()
            //    .DispatchTo(new NServiceBusCommitDispatcher<T>())

           .UsingSynchronousDispatchScheduler()
               .DispatchTo(new DelegateMessageDispatcher(DispatchCommit))

            .Build();
    }
4

1 に答える 1

1

保存時にトランザクションスコープを開いていましたが、閉じたことはありませんでした。

    public static void Save(AggregateRoot root)
    {
        // we can call CreateStream(StreamId) if we know there isn't going to be any data.
        // or we can call OpenStream(StreamId, 0, int.MaxValue) to read all commits,
        // if no commits exist then it creates a new stream for us.
        using (var scope = new TransactionScope())
        using (var eventStore = WireupEventStore())
        using (var stream = eventStore.OpenStream(root.Id, 0, int.MaxValue))
        {
            var events = root.GetUncommittedChanges();
            foreach (var e in events)
            {
                stream.Add(new EventMessage { Body = e });
            }

            var guid = Guid.NewGuid();
            stream.CommitChanges(guid);
            root.MarkChangesAsCommitted();

            scope.Complete(); // <-- missing this
        }
    }
于 2013-03-25T20:18:20.343 に答える