Create Scalding Source like TextLine that combines multiple files into single mappers
You get the idea in your question, so here is what possibly is a solution for you.
Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.LineRecordReader;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.lib.CombineFileInputFormat;import org.apache.hadoop.mapred.lib.CombineFileRecordReader;import org.apache.hadoop.mapred.lib.CombineFileSplit;public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> { public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> { private final RecordReader<LongWritable,Text> delegate; public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); delegate = new LineRecordReader(conf, fileSplit); } @Override public boolean next(LongWritable key, Text value) throws IOException { return delegate.next(key, value); } @Override public LongWritable createKey() { return delegate.createKey(); } @Override public Text createValue() { return delegate.createValue(); } @Override public long getPos() throws IOException { return delegate.getPos(); } @Override public void close() throws IOException { delegate.close(); } @Override public float getProgress() throws IOException { return delegate.getProgress(); } } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class); }}
Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).
import cascading.scheme.hadoop.TextLineimport cascading.flow.FlowProcessimport org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}import cascading.tap.Tapimport com.twitter.scalding.{FixedPathSource, TextLineScheme}import cascading.scheme.Schemeclass CombineFileTextLine extends TextLine{ override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) { super.sourceConfInit(flowProcess, tap, conf) conf.setInputFormat(classOf[CombinedInputFormat[String, String]]) }}
Create a scheme for the for your combined input.
trait CombineFileTextLineScheme extends TextLineScheme{ override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]}
Finally, create your source class:
case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme
If you want to use a single path instead of multiple ones, the change to your source class is trivial.
I hope that helps.