0

ハンドラーで例外がスローされないのに、NServiceBusがメッセージをX回再結合するという奇妙な問題が発生しました。NHibernateセッションとNSBアンビアントトランザクションを扱ういくつかの情報があります。エラーがスローされないので、私は問題を100%確信しておらず、したがって、何をすべきかを実際に決定することはできません。

私はNSBをCastleWindsorで次のように構成しました。

IWindsorContainer container = new WindsorContainer(new XmlInterpreter());
container.Install(new ContainerInstaller());
container.Install(new UnitOfWorkInstaller(AppDomain.CurrentDomain.BaseDirectory, Castle.Core.LifestyleType.Scoped));
container.Install(new FactoryInstaller(AppDomain.CurrentDomain.BaseDirectory));
container.Install(new RepositoryInstaller(AppDomain.CurrentDomain.BaseDirectory));

Configure.With()
    .CastleWindsorBuilder(container)
    .FileShareDataBus(Properties.Settings.Default.DataBusFileSharePath)
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .UnicastBus()
         .LoadMessageHandlers()
         .ImpersonateSender(false)
     .JsonSerializer();

レジスタは、次のUnitOfWorkInstallerように作業単位(NHibernateセッション)を登録します。

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    var fromAssemblyDescriptor = AllTypes.FromAssemblyInDirectory(new AssemblyFilter(_installationPath));
    container.Register(fromAssemblyDescriptor
        .IncludeNonPublicTypes()
        .Pick()
        .If(t => t.GetInterfaces().Any(i => i == typeof(IUnitOfWork)) && t.Namespace.StartsWith("Magma"))
        .WithService.AllInterfaces()
        .Configure(con => con.LifeStyle.Is(_lifeStyleType).UsingFactoryMethod(k => k.Resolve<IUnitOfWorkFactory>().Create())));
}

したがって、メッセージが到着するたびに、すべてのリポジトリで同じ作業単位が使用されます。現在のトランザクションを手動でロールバックするとエラーが発生することを読みました(理由はわかりません)。また、NSBがすべてのトランスポートメッセージに対して子コンテナーを作成し、この子コンテナーがメッセージの処理後に破棄されることも知っています。問題は、子コンテナーが廃棄されるときに、作業単位が次のように廃棄されることです。

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        if (_transaction != null && _transaction.IsActive)
        {
            _transaction.Dispose();
        }
        if (Session != null)
        {
            Session.Dispose();
        }
    }

私のハンドラーは次のように構成されています:(_unitOfWorkはコンストラクターの依存関係として渡されます)

    public void Handle(<MessageType> message)
    {
        using (_unitOfWork)
        {
            try
            {
                // do stuff
                _unitOfWork.Commit();
            }
            catch (Exception ex)
            {
                _unitOfWork.Rollback();

                // rethrow so the message stays in the queue
                throw;
            }
        }
    }

作業単位をコミットしないと(セッションをフラッシュしてトランザクションをコミットする)、メッセージが最大再試行回数を超えて再試行されたというエラーが発生したことがわかりました。

したがって、NHibernateセッションとその作成および廃棄方法に関連しているように見えますが、作業単位内で作成されているため、セッションファクトリを実際に使用することはできません。IMessageModuleを使用してセッションを作成および破棄できることを読みましたが、そもそもエラーの原因がわからないため、これが正しい方法であるかどうかはわかりません。

要約すると:

  • スコープ付きの作業単位を使用しているので、それを使用するすべてのハンドラーの依存関係が同じインスタンスを共有します(子コンテナーへのthx、ところで:子コンテナーがすべての一時的なものを処理すると考えて、作業単位を一時的なものとして設定しましたそのコンテナ内でシングルトンとしてオブジェクトを作成しましたが、作業単位が共有されていないことがわかりました。そのため、スコープとして設定されています)

  • ハンドラーをusing(_unitOfWork) { }ステートメントでラップして、各処理の後に作業単位を破棄しています。

  • 作業単位が破棄されると、NHibernateセッションも破棄されます

  • を明示的に呼び出さないとCommit_unitOfWorkメッセージは最大再試行回数を超えて再試行し、エラーがスローされます。

この動作の原因は何ですか?IMessageModuleがこれに対する答えですか?

4

1 に答える 1

0

少し絞り込んだと思います... NHibernateのセッションがNSBトランザクションスコープに参加していたため、、、、およびをすべて削除しusing(_unitOfWork)、NSBにトランザクションのコミットまたはロールバックの作業を任せました。_unitOfWork.Commit()_unitOfWork.Rollback()TransactionScope

Session.Transactionまた、NHibernateセッションのトランザクション()を使用する代わりに、NHibernateセッションのトランザクション()を使用し始めましたSession.BeginTransaction()。違いがわかるように、UoW実装をコピーして貼り付けました(古いコードはコメントにあります)。

変更が何かを説明しているかどうかはわかりませんが、セッションのトランザクションを使用し、トランザクションのコミット内で処理されているためフラッシュを削除すると、問題が解決したようです...Commit順番にメソッドを明示的に呼び出す必要はありませんメッセージが正常に処理されるようになります。これが私のUoW実装です:

public class NHibernateUnitOfWork : INHibernateUnitOfWork
{
    //private ITransaction _transaction;
    private bool _isDisposed;
    private bool _isInError;

    public ISession Session { get; protected set; }

    public NHibernateUnitOfWork(ISession session)
    {
        Contract.Requires(session != null, "session");
        Session = session;

        //_transaction = Session.BeginTransaction();

        // create a new transaction as soon as the session is available
        Session.BeginTransaction();
        _isDisposed = false;
        _isInError = false;
    }

    public void MarkCreated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.Update(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkSavedOrUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException)
        {
            HandleError();
            throw;
        }
    }

    public void MarkDeleted(Object entity)
    {
        // assert stuff

        try
        {
            Session.Delete(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Commit()
    {
        // assert stuff

        try
        {
            //Session.Flush();
            //_transaction.Commit();
            Session.Transaction.Commit();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Rollback()
    {
        // assert stuff

        try
        {
            //if (!_transaction.WasRolledBack)
            //{
            //    _transaction.Rollback();
            //}
            Session.Transaction.Rollback();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Dispose();
        //}
        if (Session != null)
        {
            try
            {
                // rollback all uncommitted changes
                if (Session.Transaction != null && Session.Transaction.IsActive)
                {
                    Session.Transaction.Rollback();
                }
                //Session.Clear();
                Session.Close();
            }
            catch (Exception)
            { }
            finally
            {
                Session.Dispose();
            }
        }
    }

    private void HandleError()
    {
        _isInError = true;
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Rollback();
        //}
        if (Session.Transaction != null && Session.Transaction.IsActive)
        {
            Session.Transaction.Rollback();
        }
    }

    // assert methods
}

これは意味がありますか?そもそもエラーの原因はまだわかりませんが、トランザクションスコープが終了する前にNHibernateセッションを破棄することに関係しているようです。

于 2012-04-12T15:36:32.320 に答える