Processing JSON using java Mapreduce
First, the JSON objects you are trying to work with are not available for you. To solve this:
- Go here and download as zip: https://github.com/douglascrockford/JSON-java
- Extract to your sources folder in subdirectory org/json/*
Next, the first line of your code makes a package "org.json", which is incorrect, you shold create a separate package, for instance "my.books".
Third, using combiner here is useless.
Here's the code I ended up with, it works and solves your problem:
package my.books;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.json.*;import javax.security.auth.callback.TextInputCallback;public class CombineBooks { public static class Map extends Mapper<LongWritable, Text, Text, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String author; String book; String line = value.toString(); String[] tuple = line.split("\\n"); try{ for(int i=0;i<tuple.length; i++){ JSONObject obj = new JSONObject(tuple[i]); author = obj.getString("author"); book = obj.getString("book"); context.write(new Text(author), new Text(book)); } }catch(JSONException e){ e.printStackTrace(); } } } public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{ public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ try{ JSONObject obj = new JSONObject(); JSONArray ja = new JSONArray(); for(Text val : values){ JSONObject jo = new JSONObject().put("book", val.toString()); ja.put(jo); } obj.put("books", ja); obj.put("author", key.toString()); context.write(NullWritable.get(), new Text(obj.toString())); }catch(JSONException e){ e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("Usage: CombineBooks <in> <out>"); System.exit(2); } Job job = new Job(conf, "CombineBooks"); job.setJarByClass(CombineBooks.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
Here's the folder structure of my project:
srcsrc/mysrc/my/bookssrc/my/books/CombineBooks.javasrc/orgsrc/org/jsonsrc/org/json/zipsrc/org/json/zip/BitReader.java...src/org/json/zip/None.javasrc/org/json/JSONStringer.javasrc/org/json/JSONML.java...src/org/json/JSONException.java
Here's the input
[localhost:CombineBooks]$ hdfs dfs -cat /example.txt{"author":"author1", "book":"book1"}{"author":"author1", "book":"book2"}{"author":"author1", "book":"book3"}{"author":"author2", "book":"book4"}{"author":"author2", "book":"book5"}{"author":"author3", "book":"book6"}
The command to run:
hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output
Here's the output:
[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}{"books":[{"book":"book6"}],"author":"author3"}
You can use on of the three options to put the org.json.*
classes into your cluster:
- Pack the
org.json.*
classes into your jar file (can easily be done using GUI IDE). This is the option I used in my answer - Put the jar file containing
org.json.*
classes on each of the cluster nodes into one of the CLASSPATH directories (see yarn.application.classpath) - Put the jar file containing
org.json.*
into HDFS (hdfs dfs -put <org.json jar> <hdfs path>
) and usejob.addFileToClassPath
call for this jar file to be available for all of the tasks executing your job on the cluster. In my answer you should addjob.addFileToClassPath(new Path("<jar_file_on_hdfs_location>"));
to themain