Is Mapreduce job using multiplethreading Is Mapreduce job using multiplethreading hadoop hadoop

Is Mapreduce job using multiplethreading


Is the single mapper using multiple threading in a single machine?

YES. Mapreduce job can use multithreaded mapper(Multiple threads or thread pool running map method) .

  • I have used for better CPU utilization for Map only Hbase jobs...

    MultiThreadedMapper is a good fit if your operation is highly CPU intensive, could increase the speed.

mapper class should extend org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of regular org.apache.hadoop.mapreduce.Mapper .

The Multithreadedmapper has a different implementation of run() method. like below.

run(org.apache.hadoop.mapreduce.Mapper.Context context)

Run the application's maps using a thread pool.

You can set the number of threads within a mapper in MultiThreadedMapper by

MultithreadedMapper.setNumberOfThreads(n); or you can set the property in loading from a property file mapred.map.multithreadedrunner.threads = nand use above setter method(per job basis) to control jobs which are less cpu intensive.

The affect of doing this, you can see in mapreduce counters specially CPU related counters.

Example Code snippet of MultithreadedMapper implementation:

import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;import java.util.regex.Pattern;public class MultithreadedWordCount {    // class should be thread safe    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {        public static enum PREPOST { SETUP, CLEANUP }        @Override()        protected void setup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, java.lang.InterruptedException {            // will be called several times            context.getCounter(PREPOST.SETUP).increment(1);        }        @Override        protected void map(LongWritable key, Text value,                     Context context) throws IOException, InterruptedException {            String[] words = value.toString().toLowerCase().split("[\\p{Blank}[\\p{Punct}]]+");            for (String word : words) {                context.write(new Text(word), new LongWritable(1));            }        }        @Override()        protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {            // will be called several times            context.getCounter(PREPOST.CLEANUP).increment(1);        }    }    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {        @Override        protected void reduce(Text key, Iterable<LongWritable> values, Context context                        ) throws IOException, InterruptedException {            long sum = 0;            for (LongWritable value: values) {              sum += value.get();            }            context.write(key, new LongWritable(sum));        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Job job = new Job();        job.setJarByClass(WordCount.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        MultithreadedMapper.setMapperClass(job, MultithreadedWordCount.WordCountMapper.class);        MultithreadedMapper.setNumberOfThreads(job, 10);        job.setMapperClass(MultithreadedMapper.class);        job.setCombinerClass(MultithreadedWordCount.WordCountReducer.class);        job.setReducerClass(MultithreadedWordCount.WordCountReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        /* begin defaults */        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        /* end defaults */        job.waitForCompletion(true);    }}


Please refer https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/Mapper.html

Applications may override the run(Context) method to exert greater control on map processing e.g. multi-threaded Mappers etc.

Moreover there is also a MultithreadedMapper . I have never used this though.