2

次のコードを使用して、データを Bigtable に入力しています。

CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withConfiguration("clusterId", options.getBigTableClusterId())
                .withProjectId(options.getProject())
                .withInstanceId(options.getBigTableInstanceId())
                .withTableId(options.getOutputBTTable())
                .build();
     Pipeline p = Pipeline.create(options);
     /**
      * Read Data from Big Query
      */
     CloudBigtableIO.initializeForWrite(p);
     p.apply(BigQueryIO.Read.fromQuery(getQuery(options.getDate())))
        .apply(ParDo.of(new DoFn<TableRow, Mutation>() {
           public void processElement(ProcessContext c) {
             Mutation output = convertDataToRow(c.element());
             if (output != null) { 
                 c.output(output); 
                 };
           }

           }))
         .apply(CloudBigtableIO.writeToTable(config));
     p.run();

private static Mutation convertDataToRow(TableRow element) {
     LOG.info("element: "+ element);
     if(element.get("BASM_AID") != null){
         Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
                obj.addColumn(USER_FAMILY, AID, ((String)element.get("BASM_AID")).getBytes());
         if(element.get("BASM_segment_id") != null){
                obj.addColumn(SEGMENT_FAMILY, SEGMENT_ID, ((String)element.get("BASM_segment_id")).getBytes());
         }
         if(element.get("BAS_sub_category") != null){
                obj.addColumn(SEGMENT_FAMILY, SUB_CATEGORY, ((String)element.get("BAS_sub_category")).getBytes());
         }
         if(element.get("BAS_name") != null){
                obj.addColumn(SEGMENT_FAMILY, NAME, ((String)element.get("BAS_name")).getBytes());
         }
         if(element.get("BAS_description") != null){
                obj.addColumn(SEGMENT_FAMILY, DESCRIPTION, ((String)element.get("BAS_description")).getBytes());
         }
         if(element.get("BASM_krux_user_id") != null){
             obj.addColumn(USER_FAMILY, KRUX_USER_ID, ((String)element.get("BASM_krux_user_id")).getBytes());
         }
         if(element.get("BAS_last_compute_day") != null){
                obj.addColumn(SEGMENT_FAMILY, LAST_COMPUTE_DAY, ((String)element.get("BAS_last_compute_day")).getBytes());
         }
         if(element.get("BAS_type") != null){
                obj.addColumn(SEGMENT_FAMILY, TYPE, ((String)element.get("BAS_type")).getBytes());
         }      
         if(element.get("BASM_REGID") != null){
                obj.addColumn(USER_FAMILY, REGID, ((String)element.get("BASM_REGID")).getBytes() );
         }
        return obj;
     }else{
         return null;
     }
    }

30 個の Bigtable ノードがあり、私のデータ フロー ジョブは 100 個のワーカーで動作しています。プロセス全体で約 100 億行のデータを処理する必要があります。上記の構成では、ジョブの完了に 1 日以上かかりますが、これは理想的ではありません。

ジョブを少し速く実行できるコード レベルの提案があれば、Bigtable ノードの数を増やすことがオプションの 1 つであることは知っていますが、現在、ノードを増やす必要がない他のオプションを探しています。

4

2 に答える 2