Hadoop 1 input file = 1 output file, map-only Hadoop 1 input file = 1 output file, map-only hadoop hadoop

Hadoop 1 input file = 1 output file, map-only


If you turn off speculative execution, there is nothing stopping you manually creating the output folder structure / files in your mapper, and writing the records to them (ignoring the output context / collector)

For example, extending the snippet (setup method), you could do something like this (which is basically what multiple outputs is doing, but assuming that speculative execution is turned off to avoid file collisions where two map tasks are trying to write to the same output file):

import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MultiOutputsMapper extends        Mapper<LongWritable, Text, NullWritable, NullWritable> {    protected String filenameKey;    private RecordWriter<Text, Text> writer;    private Text outputValue;    private Text outputKey;    @Override    protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {        // operate on the input record        // ...        // write to output file using writer rather than context        writer.write(outputKey, outputValue);    }    @Override    protected void setup(Context context) throws IOException,            InterruptedException {        InputSplit split = context.getInputSplit();        Path path = ((FileSplit) split).getPath();        // extract parent folder and filename        filenameKey = path.getParent().getName() + "/" + path.getName();        // base output folder        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);        // output file name        final Path outputFilePath = new Path(baseOutputPath, filenameKey);        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {            @Override            public Path getDefaultWorkFile(TaskAttemptContext context,                    String extension) throws IOException {                return outputFilePath;            }        };        // create a record writer that will write to the desired output subfolder        writer = tof.getRecordWriter(context);    }    @Override    protected void cleanup(Context context) throws IOException,            InterruptedException {        writer.close(context);    }}

Some points for consideration:

  • Are the customerx/yyyy-MM-dd paths files or folders of files (if folders of files, then you'll need to amend accordingly - this implementation assumes that there is one file per date and the file name is yyyy-MM-dd)
  • You may wish to look into LazyOutputFormat to prevent empty output map files being created