0

私はSpringバッチを初めて使用し、DBから巨大なデータセットをプルしてファイルに書き込む必要があるSpringバッチジョブを実装しています。以下は、私にとって期待どおりに機能しているサンプルジョブ構成です。

@Bean
public Job customDBReaderFileWriterJob() throws Exception {
    return jobBuilderFactory.get(MY_JOB)
            .incrementer(new RunIdIncrementer())
            .flow(partitionGenerationStep())
            .next(cleanupStep())
            .end()
            .build();
}

@Bean
public Step partitionGenerationStep() throws Exception {
    return stepBuilderFactory
            .get("partitionGenerationStep")
            .partitioner("Partitioner", partitioner())
            .step(multiOperationStep())
            .gridSize(50)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step multiOperationStep() throws Exception {
    return stepBuilderFactory
            .get("MultiOperationStep")
            .<Input, Output>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public DBPartitioner partitioner() {
    DBPartitioner dbPartitioner = new DBPartitioner();
    dbPartitioner.setColumn(ID);
    dbPartitioner.setDataSource(dataSource);
    dbPartitioner.setTable(TABLE);
    return dbPartitioner;
}

@Bean
@StepScope
public Reader reader() {
    return new Reader();
}

@Bean
@StepScope
public Processor processor() {
    return new Processor();
}

@Bean
@StepScope
public Writer writer() {
    return new Writer();
}    

@Bean
public Step cleanupStep() {
    return stepBuilderFactory.get("cleanupStep")
            .tasklet(cleanupTasklet())
            .build();
}

@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
    return new CleanupTasklet();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-");
    return executor;
}    

データ セットが巨大であるため、タスク エグゼキューターのスレッド プール値を 10 に設定し、グリッド サイズを 50 に設定しました。このセットアップでは、10 個のスレッドが一度に 10 個のファイルに書き込み、リーダーはファイルをチャンク単位で読み取るため、リーダー プロセッサとライターフローは複数回反復しています (次のパーティションに移動する前に、10 のグループの場合)。

ここで、1 つのスレッドのすべての反復 (読み取り、処理、書き込み) が完了したら、つまり各パーティションの完了後にファイルを圧縮できるタスクレットを追加したいと思います。

最後に実行するクリーンアップ タスクレットがありますが、そこに圧縮ロジックがあるということは、最初に各パーティションから生成されたすべてのファイルを取得してから圧縮を実行することを意味します。提案してください。

4

1 に答える 1

0

ワーカー ステップmultiOperationStepFlowStepチャンク指向のステップに変更し、その後に圧縮を行う単純なタスクレット ステップを続けることができます。つまり、ワーカー ステップは実際には 2 つのステップを 1 つにまとめたものFlowStepです。

于 2020-11-16T12:30:10.597 に答える