Create Scalding Source like TextLine that combines multiple files into single mappers Create Scalding Source like TextLine that combines multiple files into single mappers hadoop hadoop

Create Scalding Source like TextLine that combines multiple files into single mappers


You get the idea in your question, so here is what possibly is a solution for you.

Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.LineRecordReader;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.lib.CombineFileInputFormat;import org.apache.hadoop.mapred.lib.CombineFileRecordReader;import org.apache.hadoop.mapred.lib.CombineFileSplit;public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {        private final RecordReader<LongWritable,Text> delegate;        public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {            FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());            delegate = new LineRecordReader(conf, fileSplit);        }        @Override        public boolean next(LongWritable key, Text value) throws IOException {            return delegate.next(key, value);        }        @Override        public LongWritable createKey() {            return delegate.createKey();        }        @Override        public Text createValue() {            return delegate.createValue();        }        @Override        public long getPos() throws IOException {            return delegate.getPos();        }        @Override        public void close() throws IOException {            delegate.close();        }        @Override        public float getProgress() throws IOException {            return delegate.getProgress();        }    }    @Override    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {        return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);    }}

Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).

import cascading.scheme.hadoop.TextLineimport cascading.flow.FlowProcessimport org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}import cascading.tap.Tapimport com.twitter.scalding.{FixedPathSource, TextLineScheme}import cascading.scheme.Schemeclass CombineFileTextLine extends TextLine{  override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {    super.sourceConfInit(flowProcess, tap, conf)    conf.setInputFormat(classOf[CombinedInputFormat[String, String]])  }}

Create a scheme for the for your combined input.

trait CombineFileTextLineScheme extends TextLineScheme{  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]}

Finally, create your source class:

case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme

If you want to use a single path instead of multiple ones, the change to your source class is trivial.

I hope that helps.