Adding partitions to Hive from a MapReduce Job Adding partitions to Hive from a MapReduce Job hadoop hadoop

Adding partitions to Hive from a MapReduce Job


To do this from within a Map/Reduce job I would recommend using Apache HCatalog, which is a new project stamped under Hadoop.

HCatalog really is an abstraction layer on top of HDFS so you can write your outputs in a standardized way, be it from Hive, Pig or M/R. Where this comes into the picture for you, is that you can directly load data in Hive from your Map/Reduce job using the output format HCatOutputFormat. Below is an example taken from the official website.

A current code example for writing out a specific partition for (a=1,b=1) would go something like this:

Map<String, String> partitionValues = new HashMap<String, String>();partitionValues.put("a", "1");partitionValues.put("b", "1");HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);HCatOutputFormat.setOutput(job, info);

And to write to multiple partitions, separate jobs will have to be kicked off with each of the above.

You can also use dynamic partitions with HCatalog, in which case you could load as many partitions as you want in the same job !

I recommend reading further on HCatalog on the website provided above, which should give you more details if needed.


In reality, things are a little more complicated than that, which is unfortunate because it is undocumented in official sources (as of now), and it takes a few days of frustration to figure out.

I've found that I need to do the following to get HCatalog Mapreduce jobs to work with writing to dynamic partitions:

In my record writing phase of my job (usually the reducer), I have to manually add my dynamic partitions (HCatFieldSchema) to my HCatSchema objects.

The trouble is that HCatOutputFormat.getTableSchema(config) does not actually return partitioned fields. They need to be manually added

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);schema.append(hfs1);schema.append(hfs2);


Here's the code for writing into multiple tables with dynamic partitioning in one job using HCatalog, the code has been tested on Hadoop 2.5.0, Hive 0.13.1:

// ... Job setup, InputFormatClass, etc ...String dbName = null;String[] tables = {"table0", "table1"};job.setOutputFormatClass(MultiOutputFormat.class);MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);List<String> partitions = new ArrayList<String>();partitions.add(0, "partition0");partitions.add(1, "partition1");HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);for (String table : tables) {    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);    outputJobInfo.setDynamicPartitioningKeys(partitions);    HCatOutputFormat.setOutput(        configurer.getJob(table), outputJobInfo    );    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());    schema.append(partition0);    schema.append(partition1);    HCatOutputFormat.setSchema(        configurer.getJob(table),        schema    );}configurer.configure();return job.waitForCompletion(true) ? 0 : 1;

Mapper:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        HCatRecord record = new DefaultHCatRecord(3); // Including partitions        record.set(0, value.toString());        // partitions must be set after non-partition fields        record.set(1, "0"); // partition0=0        record.set(2, "1"); // partition1=1        MultiOutputFormat.write("table0", null, record, context);        MultiOutputFormat.write("table1", null, record, context);    }}