MapReduce: How to get mapper to process multiple lines?
You could use NLineInputFormat.
With NLineInputFormat
functionality, you can specify exactly how many lines should go to a mapper.E.g. If your file has 500 lines, and you set number of lines per mapper to 10, you have 50 mappers (instead of one - assuming the file is smaller than a HDFS block size).
EDIT:
Here is an example for using NLineInputFormat:
Mapper Class:
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MapperNLine extends Mapper<LongWritable, Text, LongWritable, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); }}
Driver class:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Driver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.out .printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n"); return -1; } Job job = new Job(getConf()); job.setJobName("NLineInputFormat example"); job.setJarByClass(Driver.class); job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.addInputPath(job, new Path(args[0])); job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", 5); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MapperNLine.class); job.setNumReduceTasks(0); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new Driver(), args); System.exit(exitCode); }}
With the input you provided the output from the above sample Mapper would be written to two files as 2 Mappers get initialized :
part-m-00001
0 This is8 an arbitrary example file34 of 10 lines.47 Each line does62 not have to be
part-m-00002
77 of80 the same89 length or contain107 the same116 number of words