3 時間ごとに 20 GB のデータ ファイルなどのパターンを Bigtable に一括読み込みする最適な方法は何ですか? Dataflow はこれに適していますか?
Dataflow を使用した Bigtable の一括読み込みに関する問題は..
Dataflow QPS が Bigtable (5 ノード) の QPS と一致していないようです。Dataflow を使用して 20 GB のファイルを bigtable に読み込もうとしています。bigtable に取り込むのに 4 時間かかります。また、実行中にこの警告が表示され続けます..
{
"code" : 429,
"errors" : [ {
"domain" : "global",
"message" : "Request throttled due to project QPS limit being reached.",
"reason" : "rateLimitExceeded"
} ],
"message" : "Request throttled due to project QPS limit being reached.",
"status" : "RESOURCE_EXHAUSTED"
}.
コード:
// CloudBigtableOptions is one way to retrieve the options. It's not
// required.
CloudBigtableOptions options = PipelineOptionsFactory.fromArgs(btargs.toArray(new String[btargs.size()]))
.withValidation().as(CloudBigtableOptions.class);
// CloudBigtableTableConfiguration contains the project, zone, cluster
// and table to connect to.
CloudBigtableTableConfiguration config = CloudBigtableTableConfiguration.fromCBTOptions(options);
Pipeline p = Pipeline.create(options);
// This sets up serialization for Puts and Deletes so that Dataflow can
// potentially move them through the network.
CloudBigtableIO.initializeForWrite(p);
p.apply(TextIO.Read.from(inpath)).apply(ParDo.of(new CreatePutsFn(columns, delim)))
.apply(CloudBigtableIO.writeToTable(config));
p.run();
CreatePutsFn:
@Override
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
String[] vals = c.element().split(this.delim);
for (int i = 0; i < columns.length; i++) {
if (i != keyPos && vals[i].trim() != "") {
c.output(new Put(vals[keyPos].getBytes()).addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()),
Bytes.toBytes(vals[i])));
}
}
}
ここで何か助けていただければ幸いです。ありがとう