Storing Apache Hadoop Data Output to Mysql Database
The great example is shown on this blog, I tried it and it goes really well. I quote the most important parts of the code.
At first, you must create a class representing data you would like to store. The class must implement DBWritable interface:
public class DBOutputWritable implements Writable, DBWritable{ private String name; private int count; public DBOutputWritable(String name, int count) { this.name = name; this.count = count; } public void readFields(DataInput in) throws IOException { } public void readFields(ResultSet rs) throws SQLException { name = rs.getString(1); count = rs.getInt(2); } public void write(DataOutput out) throws IOException { } public void write(PreparedStatement ps) throws SQLException { ps.setString(1, name); ps.setInt(2, count); }}
Create objects of previously defined class in your Reducer:
public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> { protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) { int sum = 0; for(IntWritable value : values) { sum += value.get(); } try { ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get()); } catch(IOException e) { e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } }}
Finally you must configure a connection to your DB (do not forget to add your db connector on the classpath) and register your mapper's and reducer's input/output data types.
public class Main{ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/testDb", // db url "user", // username "password"); //password Job job = new Job(conf); job.setJarByClass(Main.class); job.setMapperClass(Map.class); // your mapper - not shown in this example job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT job.setInputFormatClass(...); job.setOutputFormatClass(DBOutputFormat.class); DBInputFormat.setInput(...); DBOutputFormat.setOutput( job, "output", // output table name new String[] { "name", "count" } //table columns ); System.exit(job.waitForCompletion(true) ? 0 : 1); }}