4

したがって、データベースに書き込むステップに 2 つのパーティションがあります。各パーティションに書き込まれた行数を記録し、合計を取得してログに出力したいと考えています。

Writer で変数を使用し、staticStep Context/Job Context を使用してそれをafterStep()Step Listener に取り込むことを考えていました。しかし、私がそれを試したとき、私は得nullました。close()Reader でこれらの値を取得できます。

これは正しい方法ですか?または、Partition Collector/Reducer/Analyzer を使用する必要がありますか?

Websphere Liberty で Java バッチを使用しています。そして、私はEclipseで開発しています。

4

2 に答える 2

5

ライターで静的変数を使用し、ステップ コンテキスト/ジョブ コンテキストを使用して、ステップ リスナーの afterStep() で取得することを考えていました。しかし、試してみるとnullになりました。

この時点でItemWriterはすでに破棄されている可能性がありますが、わかりません。

これは正しい方法ですか?

はい、それで十分です。ただし、バッチ ランタイムはパーティションごとにStepContextクローンを維持するため、すべてのパーティションで合計行数が共有されるようにする必要があります。むしろ使用する必要がありますJobContext

PartitionCollectorPartitionAnalyzerを使用することも良い選択だと思います。インターフェイスPartitionCollectorcollectPartitionData()には、そのパーティションからのデータを収集するメソッドがあります。収集されると、バッチ ランタイムはこのデータをPartitionAnalyzerに渡してデータを分析します。があることに注意してください

  • ステップごとに N PartitionCollector (パーティションごとに 1 つ)
  • ステップごとの N StepContext (パーティションごとに 1)
  • ステップごとに 1 つの PartitionAnalyzer

書き込まれたレコードは、 StepContextのを介して渡すことができますtransientUserDataStepContextは独自のステップ パーティション用に予約されているため、一時的なユーザー データが他のパーティションによって上書きされることはありません。


実装は次のとおりです。

MyItemWriter :

@Inject
private StepContext stepContext;

@Override
public void writeItems(List<Object> items) throws Exception {
    // ...
    Object userData = stepContext.getTransientUserData();
    stepContext.setTransientUserData(partRowCount);
}

MyPartitionCollector

@Inject
private StepContext stepContext;

@Override
public Serializable collectPartitionData() throws Exception {

    // get transient user data
    Object userData = stepContext.getTransientUserData();
    int partRowCount = userData != null ? (int) userData : 0;
    return partRowCount;
}

MyPartitionAnalyzer

private int rowCount = 0;

@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
    rowCount += (int) fromCollector;
    System.out.printf("%d rows processed (all partitions).%n", rowCount);
}

参考:JSR352 v1.0 Final Release.pdf

于 2016-06-19T14:51:04.733 に答える
3

受け入れられた回答に少し代替案を提供し、コメントを追加さ​​せてください。

PartitionAnalyzer バリアント - analyzeStatus() メソッドを使用する

もう 1 つの手法はanalyzeStatus、各パーティション全体の最後にのみ呼び出され、パーティション レベルの終了ステータスが渡されるものを使用することです。

public void analyzeStatus(BatchStatus batchStatus, String exitStatus) 

対照的に、上記の回答を使用analyzeCollectorDataすると、各パーティションの各チャンクの最後に呼び出されます。

例えば

public class MyItemWriteListener extends AbstractItemWriteListener {

@Inject
StepContext stepCtx;

@Override
public void afterWrite(List<Object> items) throws Exception {
    // update 'newCount' based on items.size()
    stepCtx.setExitStatus(Integer.toString(newCount));
}

明らかに、これは終了ステータスを他の目的で使用していない場合にのみ機能します。任意のアーティファクトから終了ステータスを設定できます (ただし、この自由は追跡する必要があるもう 1 つのことかもしれません)。

コメント

この API は、JVM 間で個々のパーティションをディスパッチする実装を容易にするように設計されています (たとえば、Liberty では、ここで確認できます)。ただし、静的を使用すると単一の JVM に結び付けられるため、推奨される方法ではありません。

また、JobContextStepContextの両方が、バッチで見られる「スレッドローカル」のような方法で実装されていることに注意してください。

于 2016-06-20T15:49:28.167 に答える