First, the JSON objects you are trying to work with are not available for you. To solve this:

  1. Go here and download as zip:
  2. Extract to your sources folder in subdirectory org/json/*

Next, the first line of your code makes a package "org.json", which is incorrect, you shold create a separate package, for instance "my.books".

Third, using combiner here is useless.

Here's the code I ended up with, it works and solves your problem:

package my.books;import;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import;import;import;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.json.*;import;public class CombineBooks {    public static class Map extends Mapper<LongWritable, Text, Text, Text>{        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{            String author;            String book;            String line = value.toString();            String[] tuple = line.split("\\n");            try{                for(int i=0;i<tuple.length; i++){                    JSONObject obj = new JSONObject(tuple[i]);                    author = obj.getString("author");                    book = obj.getString("book");                    context.write(new Text(author), new Text(book));                }            }catch(JSONException e){                e.printStackTrace();            }        }    }    public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{            try{                JSONObject obj = new JSONObject();                JSONArray ja = new JSONArray();                for(Text val : values){                    JSONObject jo = new JSONObject().put("book", val.toString());                    ja.put(jo);                }                obj.put("books", ja);                obj.put("author", key.toString());                context.write(NullWritable.get(), new Text(obj.toString()));            }catch(JSONException e){                e.printStackTrace();            }        }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        if (args.length != 2) {            System.err.println("Usage: CombineBooks <in> <out>");            System.exit(2);        }        Job job = new Job(conf, "CombineBooks");        job.setJarByClass(CombineBooks.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

Here's the folder structure of my project:


Here's the input

[localhost:CombineBooks]$ hdfs dfs -cat /example.txt{"author":"author1", "book":"book1"}{"author":"author1", "book":"book2"}{"author":"author1", "book":"book3"}{"author":"author2", "book":"book4"}{"author":"author2", "book":"book5"}{"author":"author3", "book":"book6"}

The command to run:

hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output

Here's the output:

[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}{"books":[{"book":"book6"}],"author":"author3"}

You can use on of the three options to put the org.json.* classes into your cluster:

  1. Pack the org.json.* classes into your jar file (can easily be done using GUI IDE). This is the option I used in my answer
  2. Put the jar file containing org.json.* classes on each of the cluster nodes into one of the CLASSPATH directories (see yarn.application.classpath)
  3. Put the jar file containing org.json.* into HDFS (hdfs dfs -put <org.json jar> <hdfs path>) and use job.addFileToClassPath call for this jar file to be available for all of the tasks executing your job on the cluster. In my answer you should add job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>")); to the main