2

3 時間ごとに 20 GB のデータ ファイルなどのパターンを Bigtable に一括読み込みする最適な方法は何ですか? Dataflow はこれに適していますか?

Dataflow を使用した Bigtable の一括読み込みに関する問題は..

Dataflow QPS が Bigtable (5 ノード) の QPS と一致していないようです。Dataflow を使用して 20 GB のファイルを bigtable に読み込もうとしています。bigtable に取り込むのに 4 時間かかります。また、実行中にこの警告が表示され続けます..

{
  "code" : 429,
  "errors" : [ {
    "domain" : "global",
    "message" : "Request throttled due to project QPS limit being reached.",
    "reason" : "rateLimitExceeded"
  } ],
  "message" : "Request throttled due to project QPS limit being reached.",
  "status" : "RESOURCE_EXHAUSTED"
}.

コード:

// CloudBigtableOptions is one way to retrieve the options. It's not
// required.
CloudBigtableOptions options = PipelineOptionsFactory.fromArgs(btargs.toArray(new String[btargs.size()]))
    .withValidation().as(CloudBigtableOptions.class);

// CloudBigtableTableConfiguration contains the project, zone, cluster
// and table to connect to.
CloudBigtableTableConfiguration config = CloudBigtableTableConfiguration.fromCBTOptions(options);

Pipeline p = Pipeline.create(options);

// This sets up serialization for Puts and Deletes so that Dataflow can
// potentially move them through the network.
CloudBigtableIO.initializeForWrite(p);

p.apply(TextIO.Read.from(inpath)).apply(ParDo.of(new CreatePutsFn(columns, delim)))
    .apply(CloudBigtableIO.writeToTable(config));

p.run();

CreatePutsFn:

@Override
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    String[] vals = c.element().split(this.delim);
    for (int i = 0; i < columns.length; i++) {
        if (i != keyPos && vals[i].trim() != "") {
            c.output(new Put(vals[keyPos].getBytes()).addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()),
                    Bytes.toBytes(vals[i])));
        }
    }
}

ここで何か助けていただければ幸いです。ありがとう

4

1 に答える 1

3

この問題を解決できました。目的の結果を得るために、次の 3 つのことを行いました。これで、このジョブが実行され、(20 Gb) ファイルのデータが約 15 分で取り込まれます。以前は 4 ~ 5 時間実行されていました。

  1. このジョブは、Data-flow を使用して 3 分で 20 億の put リクエストを作成しますが、行のすべての列をバッチ処理することで、現在は 4,000 万のリクエストに削減されています。
    public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
        String[] vals = c.element().split(this.delim);
        Put put = new Put(vals[keyPos].getBytes());
        for (int i = 0; i < columns.length; i++) {
            if (i != keyPos && vals[i].trim() != "") {
                put.addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()), Bytes.toBytes(vals[i]));

            }
        }
        c.output(put);
    }
  1. クライアント書き込みバッファのプロパティを追加しました config.toHBaseConfig().set("hbase.client.write.buffer", "200971520”);

  2. Bigtable が QPS の上限に達したことについては正しかった。そのため、一括読み込み操作中に、一時的にクラスター サイズを (3 ノードから) 10 ノードに増やしました。

于 2015-11-30T18:40:29.757 に答える