Processing JSON using java Mapreduce Processing JSON using java Mapreduce json json

Processing JSON using java Mapreduce


First, the JSON objects you are trying to work with are not available for you. To solve this:

  1. Go here and download as zip: https://github.com/douglascrockford/JSON-java
  2. Extract to your sources folder in subdirectory org/json/*

Next, the first line of your code makes a package "org.json", which is incorrect, you shold create a separate package, for instance "my.books".

Third, using combiner here is useless.

Here's the code I ended up with, it works and solves your problem:

package my.books;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.json.*;import javax.security.auth.callback.TextInputCallback;public class CombineBooks {    public static class Map extends Mapper<LongWritable, Text, Text, Text>{        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{            String author;            String book;            String line = value.toString();            String[] tuple = line.split("\\n");            try{                for(int i=0;i<tuple.length; i++){                    JSONObject obj = new JSONObject(tuple[i]);                    author = obj.getString("author");                    book = obj.getString("book");                    context.write(new Text(author), new Text(book));                }            }catch(JSONException e){                e.printStackTrace();            }        }    }    public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{            try{                JSONObject obj = new JSONObject();                JSONArray ja = new JSONArray();                for(Text val : values){                    JSONObject jo = new JSONObject().put("book", val.toString());                    ja.put(jo);                }                obj.put("books", ja);                obj.put("author", key.toString());                context.write(NullWritable.get(), new Text(obj.toString()));            }catch(JSONException e){                e.printStackTrace();            }        }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        if (args.length != 2) {            System.err.println("Usage: CombineBooks <in> <out>");            System.exit(2);        }        Job job = new Job(conf, "CombineBooks");        job.setJarByClass(CombineBooks.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

Here's the folder structure of my project:

srcsrc/mysrc/my/bookssrc/my/books/CombineBooks.javasrc/orgsrc/org/jsonsrc/org/json/zipsrc/org/json/zip/BitReader.java...src/org/json/zip/None.javasrc/org/json/JSONStringer.javasrc/org/json/JSONML.java...src/org/json/JSONException.java

Here's the input

[localhost:CombineBooks]$ hdfs dfs -cat /example.txt{"author":"author1", "book":"book1"}{"author":"author1", "book":"book2"}{"author":"author1", "book":"book3"}{"author":"author2", "book":"book4"}{"author":"author2", "book":"book5"}{"author":"author3", "book":"book6"}

The command to run:

hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output

Here's the output:

[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}{"books":[{"book":"book6"}],"author":"author3"}

You can use on of the three options to put the org.json.* classes into your cluster:

  1. Pack the org.json.* classes into your jar file (can easily be done using GUI IDE). This is the option I used in my answer
  2. Put the jar file containing org.json.* classes on each of the cluster nodes into one of the CLASSPATH directories (see yarn.application.classpath)
  3. Put the jar file containing org.json.* into HDFS (hdfs dfs -put <org.json jar> <hdfs path>) and use job.addFileToClassPath call for this jar file to be available for all of the tasks executing your job on the cluster. In my answer you should add job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>")); to the main