0

DataNucleus を ORM マッピングとして、mysql をデータベースとして使用して、Spring コンテキストでアプリケーションを実行しています。

私たちのアプリケーションには、データベースへのデータ フィードの毎日のインポート ジョブがあります。データ フィードのサイズは、約 100 万行の挿入/更新に相当します。インポートのパフォーマンスは最初は非常に良好ですが、時間の経過とともに (実行されるクエリの数が増えるにつれて) 低下し、ある時点でアプリケーションがフリーズするか応答を停止します。アプリケーションが再び応答する前に、ジョブ全体が完了するまで待つ必要があります。

この動作はメモリ リークに非常によく似ており、潜在的な問題を検出するためにコードを注意深く調べましたが、問題は解消されませんでした。ヒープ ダンプから見つかった興味深い点の 1 つは、インポート中に org.datanucleus.ExecutionContextThreadedImpl (または HashSet/HashMap) がメモリの 90% (5GB) を保持していることです。(以下のダンプのスクリーンショットを添付しました)。私のインターネットでの調査によると、このリファレンスは Level1 Cache です (私が正しいかどうかはわかりません)。私の質問は、大規模なインポート中です。level1 キャッシュのサイズを制限/制御するにはどうすればよいですか。インポート中にキャッシュしないように DN に依頼することはできますか?

それが L1 キャッシュでない場合、メモリの問題の考えられる原因は何ですか?

このコードでは、挿入ごとにトランザクションを使用して、データベース内の大量のデータがロックされるのを防ぎます。2000挿入ごとにフラッシュメソッドを呼び出します

一時的な修正として、誰もアプリを使用していないときにインポート プロセスを夜間に実行するようにしました。もちろん、これが永遠に続くわけではありません。少なくとも誰かが私たちを正しい方向に向けて、より多くの調査を行い、修正を見つけることができるようにしてください.

誰かがヒープダンプをデコードする知識を持っているとよいでしょう

あなたの助けは、ここにいる私たち全員にとって非常に高く評価されます. どうもありがとう!

https://s3-ap-southeast-1.amazonaws.com/public-external/datanucleus_heap_dump.png

https://s3-ap-southeast-1.amazonaws.com/public-external/datanucleus_dump2.png

以下のコード - このメソッドの呼び出し元にはトランザクションがありません。このメソッドは呼び出しごとに 1 つのインポート オブジェクトを処理し、これらのオブジェクトを毎日約 100K 処理する必要があります。

@Override
@PreAuthorize("(hasUserRole('ROLE_ADMIN')")
@Transactional(propagation = Propagation.REQUIRED)
public void processImport(ImportInvestorAccountUpdate account, String advisorCompanyKey) {

    ImportInvestorAccountDescriptor invAccDesc = account
            .getInvestorAccount();

    InvestorAccount invAcc = getInvestorAccountByImportDescriptor(
            invAccDesc, advisorCompanyKey);

    try {

        ParseReportingData parseReportingData = ctx
                .getBean(ParseReportingData.class);

        String baseCCY = invAcc.getBaseCurrency();
        Date valueDate = account.getValueDate();
        ArrayList<InvestorAccountInformationILAS> infoList = parseReportingData
                .getInvestorAccountInformationILAS(null, invAcc, valueDate,
                        baseCCY);

        InvestorAccountInformationILAS info = infoList.get(0);

        PositionSnapshot snapshot = new PositionSnapshot();
        ArrayList<Position> posList = new ArrayList<Position>();
        Double totalValueInBase = 0.0;
        double totalQty = 0.0;



        for (ImportPosition importPos : account.getPositions()) {
            Asset asset = getAssetByImportDescriptor(importPos
                    .getTicker());
            PositionInsurance pos = new PositionInsurance();
            pos.setAsset(asset);
            pos.setQuantity(importPos.getUnits());
            pos.setQuantityType(Position.QUANTITY_TYPE_UNITS);
            posList.add(pos);
        }

        snapshot.setPositions(posList);
        info.setHoldings(snapshot);

        log.info("persisting a new investorAccountInformation(source:"
                + invAcc.getReportSource() + ") on " + valueDate
                + " of InvestorAccount(key:" + invAcc.getKey() + ")");
        persistenceService.updateManagementEntity(invAcc);



    } catch (Exception e) {
        throw new DataImportException(invAcc == null ? null : invAcc.getKey(), advisorCompanyKey,
                e.getMessage());
    }

}

4

1 に答える 1