3

私はHiveとMapReduceが初めてで、あなたの答えに本当に感謝し、正しいアプローチも提供します.

logs日付とオリジンサーバーでパーティション分割されたハイブに外部テーブルを hdfs の外部の場所で定義しまし/data/logs/た。これらのログ ファイルを取得して分割し、上記のフォルダーに保存する MapReduce ジョブがあります。お気に入り

"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...

MapReduce ジョブから、Hive のテーブル ログにパーティションを追加したいと考えています。私は2つのアプローチを知っています

  1. テーブル変更コマンド -- テーブル変更コマンドが多すぎます
  2. 動的パーティションの追加

アプローチ2についてINSERT OVERWRITEは、私にとって選択肢ではない例のみが表示されます。ジョブの終了後にこれらの新しいパーティションをテーブルに追加する方法はありますか?

4

3 に答える 3

3

Map/Reduce ジョブ内からこれを行うには、Hadoop の下でスタンプされた新しいプロジェクトである Apache HCatalog を使用することをお勧めします。

HCatalog は実際には HDFS 上の抽象化レイヤーであるため、Hive、Pig、または M/R からの出力を標準化された方法で書き込むことができます。これが理解できるのは、出力フォーマットを使用して Map/Reduce ジョブから Hive に直接データをロードできることですHCatOutputFormat。以下は、公式ウェブサイトからの例です。

(a=1,b=1) の特定のパーティションを書き出すための現在のコード例は、次のようになります。

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("a", "1");
partitionValues.put("b", "1");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

また、複数のパーティションに書き込むには、上記のそれぞれで個別のジョブを開始する必要があります。

HCatalog で動的パーティションを使用することもできます。この場合、同じジョブで必要な数のパーティションをロードできます。

上記の Web サイトの HCatalog をさらに読むことをお勧めします。必要に応じて詳細を確認できます。

于 2013-01-11T22:24:07.893 に答える
3

実際には、物事はそれよりも少し複雑です。残念なことに、公式の情報源には記載されておらず (現時点では)、理解するのに数日間のフラストレーションがかかります.

HCatalog Mapreduce ジョブで動的パーティションへの書き込みを行うには、次の手順を実行する必要があることがわかりました。

私の仕事 (通常はリデューサー) のレコード書き込みフェーズで、動的パーティション (HCatFieldSchema) を HCatSchema オブジェクトに手動で追加する必要があります。

問題は、HCatOutputFormat.getTableSchema(config) が実際には分割されたフィールドを返さないことです。手動で追加する必要があります

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);
schema.append(hfs1);
schema.append(hfs2);
于 2014-04-22T00:06:12.110 に答える
0

HCatalog を使用して 1 つのジョブで動的パーティション分割を使用して複数のテーブルに書き込むためのコードを次に示します。コードは Hadoop 2.5.0、Hive 0.13.1 でテストされています。

// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};

job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);

List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);

for (String table : tables) {
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
    outputJobInfo.setDynamicPartitioningKeys(partitions);

    HCatOutputFormat.setOutput(
        configurer.getJob(table), outputJobInfo
    );

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
    schema.append(partition0);
    schema.append(partition1);

    HCatOutputFormat.setSchema(
        configurer.getJob(table),
        schema
    );
}
configurer.configure();

return job.waitForCompletion(true) ? 0 : 1;

マッパー:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        HCatRecord record = new DefaultHCatRecord(3); // Including partitions
        record.set(0, value.toString());

        // partitions must be set after non-partition fields
        record.set(1, "0"); // partition0=0
        record.set(2, "1"); // partition1=1

        MultiOutputFormat.write("table0", null, record, context);
        MultiOutputFormat.write("table1", null, record, context);
    }
}
于 2014-10-30T14:13:02.017 に答える