Scalding TypedPipe API External Operations pattern Scalding TypedPipe API External Operations pattern hadoop hadoop

Scalding TypedPipe API External Operations pattern


Scala extension methods are implemented using implicit classes.You add to the compiler the capability of converting a TypedPipe into a (wrapper) class that contains your external operations:

import com.twitter.scalding.TypedPipeimport com.twitter.scalding._import cascading.flow.FlowDefclass MyJob(args: Args) extends Job(args) {  implicit class MyOperationsWrapper(val self: TypedPipe[Double]) extends MyOperations with Serializable  val pipe = TypedPipe.from(TypedTsv[Double](args("input")))  val result = pipe    .operation1    .operation2(x => x*2)    .write(TypedTsv[Double](args("output")))}trait MyOperations {  def self: TypedPipe[Double]  def operation1(implicit fd: FlowDef): TypedPipe[Double] =    self.map { x =>      println(s"Input: $x")      x / 100    }  def operation2(datafn:Double => Double)(implicit fd: FlowDef): TypedPipe[Double] =    self.map { x=>      val result = datafn(x)      println(s"Result: $result")      result    }}import org.apache.hadoop.util.ToolRunnerimport org.apache.hadoop.conf.Configurationobject MyRunner extends App {  ToolRunner.run(new Configuration(), new Tool, (classOf[MyJob].getName :: "--local" ::    "--input" :: "doubles.tsv" ::    "--output":: "result.tsv" :: args.toList).toArray)}

Regarding how to manage types across the pipes, my recommendation would be to try to work out some basic types that make sense and use case classes. To use your example i would rename the method convertTextToEither into extractEvents :

case class LogInput(l : Long, text: Text)case class Event(data: String)def extractEvents( line : LogInput ): TypedPipe[Event] =  self.filter( isEvent(line) )      .map ( getEvent(line.text) ) 

Then you would have

  • LogInputOperations for LogInput types
  • EventOperations for Event types


I am not sure what is the problem you see with the snippet you showed, and why you think it is "less clean". It looks fine to me.

As for the unit testing jobs using typed API question, take a look at JobTest, it seems to be just what you are looking for.