Hadoop Cascading - create flow with one source, two sinks Hadoop Cascading - create flow with one source, two sinks hadoop hadoop

Hadoop Cascading - create flow with one source, two sinks


You can use the split pattern as mentioned in the Cascading documentation. Here's an example:

public static void main(String[] args) {    // source and sink    Scheme sourceScheme = new TextLine(new Fields("line"));    Tap source = new FileTap(sourceScheme, args[0]);    Fields sinkFields = new Fields("word", "count");    Scheme sinkScheme = new TextLine(sinkFields, sinkFields);    Tap sink_one = new FileTap(sinkScheme, "out-one.txt");    Tap sink_two = new FileTap(sinkScheme, "out-two.txt");    // the pipe assembly    Pipe assembly = new Pipe("wordcount");    String regex = "\\w+";    Function function = new RegexGenerator(new Fields("word"), regex);    assembly = new Each(assembly, new Fields("line"), function);    Aggregator count = new Count(new Fields("count"));    // ...split into two pipes    Pipe countOne = new Pipe("count-one", assembly);    countOne = new GroupBy(countOne, new Fields("word"));    countOne = new Every(countOne, count);    Pipe countTwo = new Pipe("count-two", assembly);    countTwo = new GroupBy(countTwo, new Fields("word"));    countTwo = new Every(countTwo, count);    // create the flow    final List<Pipe> pipes = new ArrayList<Pipe>(2);    pipes.add(countOne);    pipes.add(countTwo);    final Map<String, Tap> sinks = new HashMap<String, Tap>();    sinks.put("count-one", sink_one);    sinks.put("count-two", sink_two);    FlowConnector flowConnector = new LocalFlowConnector();    Flow flow = flowConnector.connect(source, sinks, pipes);    flow.complete();}


The split pattern is in the Cascading User Guide at:http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#N21362

Another (simpler) example is included in "Cascading for the Impatient", parts 5 & 6:

One point about the code shown above is that it seem to be missing the variable definitions for report1Pipe and report2Pipe. To use a split pattern, each branch requires a name and the names need to be different.

The exception gets thrown because there are two branches which have both inherited the same name from earlier in the pipe assembly. So, for example, those flowDef.addSink(..) calls are ambiguous to the flow planner.

So in "Impatient" part 5, look at how the "D", "DF", and "TF" branches get named within operations.

It may seem a bit counter-intuitive for Cascading to require this naming, but it becomes quite important in large, complex workflows when you're attaching failure traps, debug, etc., to specific branches.

Alternatively, the Cascalog DSL in Clojure is much more declarative, so this gets handled by the language directly -- branches are subqueries, and the traps, etc., get handled within the closure of a subquery.