0

チャンク パーティション ステップに読み込まれたレポートの膨大なリストがあります。各レポートはさらに処理され、個別のレポートが生成されます。しかし、パーティション ステップで 50k のレポートをロードすると、サーバーが過負荷になり、速度が大幅に低下します。その代わりに、パーティション ステップで 3k のレポート リストをロードし、それを処理してから、パーティション ステップで別の 3k レポートをロードします。50k レポートが処理されるまで同じことを続けます。

    <step id="genReport" next="fileTransfer">
        <chunk  item-count="1000">
            <reader ref="Reader" >
            </reader>
            <writer
                ref="Writer" >
            </writer>
        </chunk>
      <partition>
            <mapper ref="Mapper">
                <properties >
                    <property name="threadCount" value="#{jobProperties['threadCount']}"/>
                    <property name="threadNumber" value="#{partitionPlan['threadNumber']}"/>
                </properties>
            </mapper>
      </partition>
    </step> 
public PartitionPlan mapPartitions() {
        PartitionPlanImpl partitionPlan = new PartitionPlanImpl();
        int numberOfPartitions = //dao call to load the reports count
        partitionPlan.setThreads(getThreadCount());
        partitionPlan.setPartitions(numberOfPartitions); //This numberOfPartitions is comes from the database, huge size like 20k to 40k
        Properties[] props = new Properties[numberOfPartitions];

        for (int idx = 0; idx < numberOfPartitions; idx++) {
            Properties threadProperties = new Properties();
            threadProperties.setProperty("threadNumber", idx + "");
            GAHReportListData gahRptListData = gahReportListManager.getPageToProcess(); //Data pulled from PriorityBlockingQueue 
            String dynSqlId = gahRptListData.getDynSqlId(); 

            threadProperties.setProperty("sqlId", dynSqlId);
            threadProperties.setProperty("outFile", fileName);

            props[idx] = threadProperties;
        }
        partitionPlan.setPartitionProperties(props);
        return partitionPlan;
    }

パーティション マッパーによって処理されたデータのレポートが 3k になると、次の使用可能なリストを確認する必要があります。使用可能な場合は、次の 3k レポートのセットを処理するためにパーティションをリセットする必要があります。

4

1 に答える 1

1

パーティションをリセットする方法はありません。partitionMapper によって定義されたすべてのパーティションが完了すると、ステップは終了します。すべてを完了するまで、私が推測する最初のステップ(および3番目と4番目)と同じように、分割された2番目のステップを作成できます。それは面倒です。また、JSL でループバックして同じステップを再実行することはできません。

これらのステップを複数同時に実行する分割/フローを作成できますが、フローの数を動的に設定することはできません。それはJSLにあります。そして、あなたの環境がおそらく処理できるよりも多くの並行性が得られるでしょう.

あなたのチャンク リーダー/プロセッサ/ライターは、現在パーティションに割り当てられている 1 つの SQLid の結果を繰り返し処理していると思います。sqlids のリストを作成するには、同じチャンク ループ内で 1 つが終了し、次の 1 つがいつ開始されたかを知る方法が必要だと思います。読者はおそらくリストを管理でき、遷移がいつ発生したかを知ることができます。おそらく、チャンクの終わりが 1 つのレポートの終わりであり、次のレポートに移動する必要があるというライターへのシグナルが必要になるでしょう。それぞれの sqlid が処理するレコードを使い果たしたときにチェックポイントに到達することを期待するのではなく、レポートの最後に確実にチェックポイントを設定できるように、おそらくそのためのカスタム チェックポイント アルゴリズムが必要になるでしょう。

ここで尋ねられた質問に対する答えは「いいえ」であるように思われるため、別のコメントではなく答えとしてこれを入れています。残りは、可能な代替アプローチについての議論です。

于 2019-06-19T20:43:51.220 に答える