Hadoop 1 input file = 1 output file, map-only
If you turn off speculative execution, there is nothing stopping you manually creating the output folder structure / files in your mapper, and writing the records to them (ignoring the output context / collector)
For example, extending the snippet (setup method), you could do something like this (which is basically what multiple outputs is doing, but assuming that speculative execution is turned off to avoid file collisions where two map tasks are trying to write to the same output file):
import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MultiOutputsMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable> { protected String filenameKey; private RecordWriter<Text, Text> writer; private Text outputValue; private Text outputKey; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // operate on the input record // ... // write to output file using writer rather than context writer.write(outputKey, outputValue); } @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); // extract parent folder and filename filenameKey = path.getParent().getName() + "/" + path.getName(); // base output folder final Path baseOutputPath = FileOutputFormat.getOutputPath(context); // output file name final Path outputFilePath = new Path(baseOutputPath, filenameKey); // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() { @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { return outputFilePath; } }; // create a record writer that will write to the desired output subfolder writer = tof.getRecordWriter(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { writer.close(context); }}
Some points for consideration:
- Are the
customerx/yyyy-MM-dd
paths files or folders of files (if folders of files, then you'll need to amend accordingly - this implementation assumes that there is one file per date and the file name is yyyy-MM-dd) - You may wish to look into LazyOutputFormat to prevent empty output map files being created