MapReduce: How to get mapper to process multiple lines? MapReduce: How to get mapper to process multiple lines? hadoop hadoop

MapReduce: How to get mapper to process multiple lines?


You could use NLineInputFormat.

With NLineInputFormat functionality, you can specify exactly how many lines should go to a mapper.E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers (instead of one - assuming the file is smaller than a HDFS block size).

EDIT:

Here is an example for using NLineInputFormat:

Mapper Class:

import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MapperNLine extends Mapper<LongWritable, Text, LongWritable, Text> {    @Override    public void map(LongWritable key, Text value, Context context)          throws IOException, InterruptedException {        context.write(key, value);    }}

Driver class:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Driver extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        if (args.length != 2) {            System.out                  .printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");            return -1;        }        Job job = new Job(getConf());        job.setJobName("NLineInputFormat example");        job.setJarByClass(Driver.class);        job.setInputFormatClass(NLineInputFormat.class);        NLineInputFormat.addInputPath(job, new Path(args[0]));        job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", 5);        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setMapperClass(MapperNLine.class);        job.setNumReduceTasks(0);        boolean success = job.waitForCompletion(true);        return success ? 0 : 1;    }    public static void main(String[] args) throws Exception {        int exitCode = ToolRunner.run(new Configuration(), new Driver(), args);        System.exit(exitCode);    }}

With the input you provided the output from the above sample Mapper would be written to two files as 2 Mappers get initialized :

part-m-00001

0   This is8   an arbitrary example file34  of 10 lines.47  Each line does62  not have to be

part-m-00002

77  of80  the same89  length or contain107 the same116 number of words