Storing Apache Hadoop Data Output to Mysql Database Storing Apache Hadoop Data Output to Mysql Database hadoop hadoop

Storing Apache Hadoop Data Output to Mysql Database


The great example is shown on this blog, I tried it and it goes really well. I quote the most important parts of the code.

At first, you must create a class representing data you would like to store. The class must implement DBWritable interface:

public class DBOutputWritable implements Writable, DBWritable{   private String name;   private int count;   public DBOutputWritable(String name, int count) {     this.name = name;     this.count = count;   }   public void readFields(DataInput in) throws IOException {   }   public void readFields(ResultSet rs) throws SQLException {     name = rs.getString(1);     count = rs.getInt(2);   }   public void write(DataOutput out) throws IOException {    }   public void write(PreparedStatement ps) throws SQLException {     ps.setString(1, name);     ps.setInt(2, count);   }}

Create objects of previously defined class in your Reducer:

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {   protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {     int sum = 0;     for(IntWritable value : values) {       sum += value.get();     }     try {       ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());     } catch(IOException e) {       e.printStackTrace();     } catch(InterruptedException e) {       e.printStackTrace();     }   }}

Finally you must configure a connection to your DB (do not forget to add your db connector on the classpath) and register your mapper's and reducer's input/output data types.

public class Main{   public static void main(String[] args) throws Exception   {     Configuration conf = new Configuration();     DBConfiguration.configureDB(conf,     "com.mysql.jdbc.Driver",   // driver class     "jdbc:mysql://localhost:3306/testDb", // db url     "user",    // username     "password"); //password     Job job = new Job(conf);     job.setJarByClass(Main.class);     job.setMapperClass(Map.class); // your mapper - not shown in this example     job.setReducerClass(Reduce.class);     job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example     job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example     job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT     job.setOutputValueClass(NullWritable.class);   // reducer's VALUEOUT     job.setInputFormatClass(...);     job.setOutputFormatClass(DBOutputFormat.class);     DBInputFormat.setInput(...);     DBOutputFormat.setOutput(     job,     "output",    // output table name     new String[] { "name", "count" }   //table columns     );     System.exit(job.waitForCompletion(true) ? 0 : 1);   }}