1

HBase と Phoenix は初めてです。Apache Phoenix を使用して MapReduce を使用して、複数の列ファミリーの HBase テーブルにデータを挿入しようとしています。

これは、Phoenix によって作成された私の HBase テーブルです。

CREATE TABLE defect (planning_folder_id varchar(12) NOT NULL, artifact_id VARCHAR(12) NOT NULL, data.category VARCHAR, data.root_cause VARCHAR, association.artifact_id VARCHAR(12) CONSTRAINT PK PRIMARY KEY (planning_folder_id, artifact_id));

上記の Phoenix create table 構文から、テーブルは単純に次のようになります。

--------------------------------------------------------------------------
| planning_folder_id | artifact_id | data:category | data:root_cause | association:artifact_id | 
------------------------------------------------------------------------------------------------
     plan1234        |   artf1234  |     cat_a      |      cause_a   |       artf2345
                                                                             artf5678
                                                                             artf8987
------------------------------------------------------------------------------------------------
     plan6765        |   artf5454  |     cat_b      |      cause_a   |       artf2222
                                                                             artf7643
                                                                             artf2345
------------------------------------------------------------------------------------------------

ご覧のとおり、各アーティファクトには、列ファミリー、関連付け、および修飾子の artifact_id によって識別される関連する多数のアーティファクトがあります。

私の質問に戻ると、データを読み取り、上記のテーブルに入力する mapreduce ジョブを書きたいと思います。

これが私が持っているものです

マッパー

public class PhoenixMapper<K> extends Mapper<LongWritable, Text, K, DefectWritable> {

    private Parser parser = new MyParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String document = value.toString();
        try {
            Artifact artf = parser.parse(document);
            DefectWritable defect = new DefectWritable();
            defect.setPlanningFolderId(artf.getPlanningFolderId());
            defect.setArtifactId(artf.getId());
            defect.setRootCause(artf.getRootCause());
            defect.setCategory(artf.getCategory());


            // How to insert this into the hbase table
            defect.setAssociations(artf.getAssociations());

            context.write(null, defect);
        } catch (ParserConfigurationException | SAXException e) {
            e.printStackTrace();
        }
    }
}

Defect Writable (テーブルに書き込む)

public class DefectWritable implements DBWritable {

    private String planningFolderId;
    private String artifactId;
    private String rootCause;
    private String category;
    private String[] associations;

    // getters/setters ignored

    @Override
    public void write(PreparedStatement pstmt) throws SQLException {
        pstmt.setString(1, planningFolderId);
        pstmt.setString(2, artifactId);
        pstmt.setString(3, category);
        pstmt.setString(4, rootCause);
        // what to do with "associations"?
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
        planningFolderId = rs.getString("PLANNING_FOLDER_ID");
        artifactId = rs.getString("ARTIFACT_ID");
        category = rs.getString("CATEGORY");
        rootCause = rs.getString("ROOT_CAUSE");

    // what to do with association:artifact_id Array associationArray = rs.getArray("ASSOCIATION"); ?

    }
}

データインポーター

    public class PhoenixDataImporter extends Configured implements Tool {
    private static final String DOCUMENT_START_TAG = "<artifact>";
    private static final String DOCUMENT_END_TAG = "</artifact>";
    private static final String TABLE_DEFECT = "DEFECT"; 

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        conf.set("xmlinput.start", DOCUMENT_START_TAG);
        conf.set("xmlinput.end", DOCUMENT_END_TAG);

        Job job = Job.getInstance(conf, getClass().getSimpleName());
        job.setJarByClass(getClass());
        job.setInputFormatClass(XMLInputFormat.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputValueClass(DefectWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setMapperClass(PhoenixMapper.class);
        job.setNumReduceTasks(0);
        PhoenixMapReduceUtil.setOutput(job, TABLE_DEFECT, "PLANNING_FOLDER_ID,ARTIFACT_ID,CATEGORY,ROOT_CAUSE");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main( String[] args ) throws Exception {
        int exitCode = ToolRunner.run(HBaseConfiguration.create(), new PhoenixDataImporter(), args);
        System.exit(exitCode);
    }
}

コードは現在、PLANNING_FOLDER_ID、ARTIFACT_ID、CATEGORY、ROOT_CAUSE 列にデータを挿入します。一部のアーティファクトには複数の関連アーティファクトが存在する可能性があるため、 ASSOCIATION:ARTIFACT_ID に挿入する方法がわかりません。私の現在のソリューションはhttps://phoenix.apache.org/phoenix_mr.htmlに基づいています。

誰でもこれで私を助けることができますか?私はhbaseを初めて使用するので、私の現在のテーブル設計についてコメントしてもらえますか? テーブル (association:artifact_id 用の別のテーブル) を分離し、クエリ時に結合することを考えました。しかし、私はパフォーマンスを負担します。

不明な点がある場合は、以下にコメントしてください:)

前もって感謝します

ペラナット

4

0 に答える 0