2

私の目標は、大きな XML ファイルを解析し、XML データに基づいてオブジェクトを DB に保持し、それを迅速に行うことです。XML の解析で問題が発生した場合や、作成されたオブジェクトを検証できない場合にロールバックできるように、操作はトランザクションである必要があります。

Grails Executor プラグインを使用して操作をスレッド化しています。問題は、サービス内で作成した各スレッドが独自のトランザクションとセッションを持っていることです。4 つのスレッドを作成し、1 つが失敗した場合、失敗しなかった 3 つのセッションは既にフラッシュされているか、将来フラッシュされる可能性があります。

おそらく私の問題を解決する「現在の」Hibernateセッションを使用するように各スレッドに指示できるかどうかを考えていました。私が考えていた別の考えは、すべてのセッションがエラーなしで完了したことがわかるまで、すべてのセッションがフラッシュされるのを防ぐことができるということでした。残念ながら、これらのいずれかを行う方法がわかりません。

追加のキャッチもあります。これらの XML ファイルは解析する必要があり、今後も多数作成される予定です。これらの XML ファイルの多くには、以前の XML ファイルが解析されたときに既に作成されていたオブジェクトと同一のオブジェクトが解析時に作成されるデータが含まれています。そのような場合、既存のオブジェクトへの参照を作成する必要があります。これに対処するために、各クラスに一時isUnique変数を追加しました。ここでの質問で概説したように、関係を考慮しないため、Grailsの一意の制約を使用しても機能しません。hasMany

次の例は、実際のものと比べて非常に単純です。私が解析している XML ファイルには、多くの属性を持つ要素が深くネストされています。

次のドメイン クラスを想像してください。

class Foo {
    String ver

    Set<Bar> bars
    Set<Baz> bazs
    static hasMany = [bars: Bar, bazs: Baz]

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        ver(nullable: false)
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

class Bar {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}


class Baz {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

そして、これが私のフォルダーUtil.groovyにある私のクラスです。src/groovyこのクラスには、ドメイン クラスのインスタンスが一意であるかどうかを判断したり、既存の同等のインスタンスを取得したりするために使用するメソッドが含まれています。

import org.hibernate.Hibernate

class Util {
    /**
     * Gets the first instance of the domain class of the object provided that
     * is equal to the object provided.
     *
     * @param obj
     * @return the first instance of obj's domain class that is equal to obj
     */
    static def getFirstDuplicate(def obj) {
        def objClass = Hibernate.getClass(obj)
        objClass.getAll().find{it == obj}
    }

    /**
     * Determines if an object is unique in its domain class
     *
     * @param obj
     * @return true if obj is unique, otherwise false
     */
    static def isUnique(def obj) {
        getFirstDuplicate(obj) == null
    }

    /**
     * Validates all of an object's constraints except those contained in the
     * provided blacklist, then saves the object if it is valid.
     *
     * @param obj
     * @return the validated object, saved if valid
     */
    static def validateWithBlacklistAndSave(def obj, def blacklist = null) {
        def propertiesToValidate = obj.domainClass.constraints.keySet().collectMany{!blacklist?.contains(it)?  [it] : []}
        if(obj.validate(propertiesToValidate)) {
            obj.save(validate: false)
        }
        obj
    }
}

そして、XML ファイル "A" が次のようなものであるとします。

    <foo ver="1.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
    </foo>

barそして、XML ファイル「B」がこれに似ていると想像してください (1 つの新規追加と 1つの新規追加を除いて、XML ファイル「A」と同じですbaz)。XML ファイル「A」の後に XML ファイル「B」が解析されると、3 つの新しいオブジェクトが作成Barされる必要があります。およびXML ファイルのインポートから既に存在するもの:name = bar_5001Bazname = baz_100001Foover = 2.0barsbazsBarBazA

    <foo ver="2.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>
        <bar name="bar_5001"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
        <baz name="baz_100001"/>
    </foo>

そして、これに似たサービス:

class BigXmlFileUploadService {

    // Pass in a 20MB XML file
    def upload(def xml) {
        String rslt = null
        def xsd = Util.getDefsXsd()
        if(Util.validateXmlWithXsd(xml, xsd)) { // Validate the structure of the XML file
            def fooXml = new XmlParser().parseText(xml.getText()) // Parse the XML

            def bars = callAsync { // Make a thread for creating the Bar objects
                def bars = []
                for(barXml in fooXml.bar) { // Loop through each bar XML element inside the foo XML element
                    def bar = new Bar( // Create a new Bar object
                        name: barXml.attribute("name")
                    )
                    bar = retrieveExistingOrSave(bar) // If an instance of Bar that is equal to this one already exists then use it
                    bars.add(bar) // Add the new Bar object to the list of Bars
                }
                bars // Return the list of Bars
            }

            def bazs = callAsync { // Make a thread for creating the Baz objects
                def bazs = []
                for(bazXml in fooXml.baz) { // Loop through each baz XML element inside the foo XML element
                    def baz = new Baz( // Create a new Baz object
                        name: bazXml.attribute("name")
                    )
                    baz = retrieveExistingOrSave(baz) // If an instance of Baz that is equal to this one already exists then use it
                    bazs.add(baz) // Add the new Baz object to the list of Bazs
                }
                bazs // Return the list of Bazs
            }

            bars = bars.get() // Wait for thread then call Future.get() to get list of Bars
            bazs = bazs.get() // Wait for thread then call Future.get() to get list of Bazs

            def foo = new Foo( // Create a new Foo object with the list of Bars and Bazs
                ver: fooXml.attribute("ver")
                bars: bars
                bazs: bazs
            ).save()

            rslt = "Successfully uploaded ${xml.getName()}!"
        } else {
            rslt = "File failed XSD validation!"
        }
        rslt
    }

    private def retrieveExistingOrSave(def obj, def existingObjCache) {
        def dup = Util.getFirstDuplicate(obj)
        obj = dup ?: Util.validateWithBlacklistAndSave(obj, ["isUnique"])
        if(obj.errors.allErrors) {
            log.error "${obj} has errors ${obj.errors}"
            throw new RuntimeException() // Force transaction to rollback
        }
        obj
    }
}

問題は、サービスのuploadメソッド内で発生するすべてのことを単一のセッションで発生したように動作させるにはどうすればよいかということです。

4

2 に答える 2

2

あなたがやろうとしていることができないかもしれません。

まず、Hibernate セッションはスレッドセーフではありません:

セッションは、単一の要求、会話、または単一の作業単位のために一度使用してから破棄する必要がある、安価で非スレッドセーフなオブジェクトです。...

第 2 に、SQL クエリを並行して実行してもあまりメリットがないと思います。PostgreSQL の JDBC ドライバーがどのように機能するかを調べたところ、実際にクエリを実行するすべてのメソッドはsynchronized.

あなたがしていることの最も遅い部分はおそらく XML 処理なので、それを並列化し、単一のスレッドで永続化することをお勧めします。複数のワーカーを作成して XML から読み取り、オブジェクトをある種のキューに追加することができます。次に、セッションを所有し、解析されたオブジェクトを保存する別のワーカーを用意します。

Hibernate のバッチ処理ドキュメント ページも参照してください。各挿入後のフラッシュは、最速の方法ではありません。

最後に、オブジェクトがどのようにマップされているかはわかりませんが、すべての子オブジェクトを保存Foo した後に問題が発生する可能性があります。オブジェクトを のコレクションに追加するfooと、Hibernate がfoo_id各オブジェクトに参照を設定し、挿入したすべてのオブジェクトに対して更新クエリが実行されます。おそらくfoo最初に作成し、baz.setFoo(foo)各挿入の前に実行する必要があります。

于 2013-08-28T02:41:09.533 に答える
1

サービスを最適化して、いくつかの問題点に対処できます。

  • @takteekに同意します。xmlの解析には時間がかかります。したがって、その部分を非同期にすることを計画してください。
  • flush子オブジェクトを作成するたびに必要はありません。最適化については、以下を参照してください。

サービスクラスは次のようになります。

// Pass in a 20MB XML file
def upload(def xml) {
    String rslt = null
    def xsd = Util.getDefsXsd()
    if (Util.validateXmlWithXsd(xml, xsd)) {
        def fooXml = new XmlParser().parseText(xml.getText())

        def foo = new Foo().save(flush: true)

        def bars = callAsync {
            saveBars(foo, fooXml)
        }

        def bazs = callAsync {
            saveBazs(foo, fooXml)
        }

        //Merge the detached instances and check whether the child objects
        //are populated or not. If children are 
        //Can also issue a flush, but we do not need it yet
        //By default domain class is validated as well.
        foo = bars.get().merge() //Future returns foo
        foo = bazs.get().merge() //Future returns foo

        //Merge the detached instances and check whether the child objects
        //are populated or not. If children are 
        //absent then rollback the whole transaction
        handleTransaction {
             if(foo.bars && foo.bazs){
                foo.save(flush: true)
            } else {
                //Else block will be reached if any of 
                //the children is not associated to parent yet
                //This would happen if there was a problem in 
                //either of the thread, corresponding
                //transaction would have rolled back 
                //in the respective sessions. Hence empty associations.

                //Set transaction roll-back only
                   TransactionAspectSupport
                       .currentTransactionStatus()
                       .setRollbackOnly()

                //Or throw an Exception and 
                //let handleTransaction handle the rollback
                throw new Exception("Rolling back transaction")
            }
        }

        rslt = "Successfully uploaded ${xml.getName()}!"
    } else {
        rslt = "File failed XSD validation!"
    }
    rslt
}

def saveBars(Foo foo, fooXml) {
    handleTransaction {
        for (barXml in fooXml.bar) {
            def bar = new Bar(name: barXml.attribute("name"))
            foo.addToBars(bar)
        }
        //Optional I think as session is flushed
        //end of method
        foo.save(flush: true)
    }

    foo
}

def saveBazs(Foo foo, fooXml) {
    handleTransaction {
        for (bazXml in fooXml.baz) {
            def baz = new Baz(name: bazXml.attribute("name"))
            foo.addToBazs(baz)
        }

        //Optional I think as session is flushed
        //end of method
        foo.save(flush: true)
    }

    foo
}

def handleTransaction(Closure clos){
    try {
        clos()
    } catch (e) {
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
    }

    if (TransactionAspectSupport.currentTransactionStatus().isRollbackOnly())
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
}
于 2013-08-28T03:04:15.477 に答える