この MultipleScanTableInputFormat を使用すると、MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 構成を使用して、単一のリージョンサーバーに対して実行するマッパーの数を制御できます。このクラスはすべての入力分割をその場所 (regionserver) でグループ化し、RecordReader はマッパーのすべての集約された分割を適切に反復処理します。
これが例です
https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-Java-L90
単一のマッパーに対して複数の集約された分割を作成した作業
private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();
final Scan scan = getScan();
for (int i = 0; i < startRows.size(); i++) {
scan.setStartRow(startRows.get(i));
scan.setStopRow(stopRows.get(i));
setScan(scan);
aggregatedSplits.addAll(super.getSplits(context));
}
// set the state back to where it was..
scan.setStopRow(null);
scan.setStartRow(null);
setScan(scan);
return aggregatedSplits;
}
リージョン サーバーごとにパーティションを作成する
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> source = getAggregatedSplits(context);
if (!partitionByRegionServer) {
return source;
}
// Partition by regionserver
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
for (InputSplit split : source) {
TableSplit cast = (TableSplit) split;
String rs = cast.getRegionLocation();
partitioned.put(rs, cast);
}