Build custom join logic in Cascading ensuring MAP_SIDE only Build custom join logic in Cascading ensuring MAP_SIDE only hadoop hadoop

Build custom join logic in Cascading ensuring MAP_SIDE only


The best way to solve this problem (which I can think off) is to modify your smaller dataset. You can add a new field (F1DecidingFactor) to the smaller dataset. The value of F1Result can should like:

Sudo code

if F1DecidingFactor == "Yes" then    F1Result = ACTUAL_VALUEelse    F1Result = "N/A"

Result Table

|F1#Join|F1#Result|F1#DecidingFactor||    Yes|        0|             True||    Yes|        1|            False||     No|        0|              N/A||     No|        1|              N/A|

You can do above via cascading as well.

After this, you can do your map side join.

If modifying smaller dataset is not possible, then I have 2 options to get the problem solved.

Option 1

Add new fields to your small pipes which is equivalent to you deciding factor (i.e. F1DecidingFactor_RHS = Yes). Then include it to your join criteria. Once your join is done, You will have values to only those rows where this condition is matching. Otherwise it will be null/blank. Sample code:

Main Class

import cascading.operation.Insert;import cascading.pipe.Each;import cascading.pipe.HashJoin;import cascading.pipe.Pipe;import cascading.pipe.assembly.Discard;import cascading.pipe.joiner.LeftJoin;import cascading.tuple.Fields;public class StackHashJoinTestOption2 {    public StackHashJoinTestOption2() {        Fields f1Input = new Fields("F1Input");        Fields f2Input = new Fields("F2Input");        Fields f1Join = new Fields("F1Join");        Fields f2Join = new Fields("F2Join");        Fields f1DecidingFactor = new Fields("F1DecidingFactor");        Fields f2DecidingFactor = new Fields("F2DecidingFactor");        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");        // Large Pipe fields :         // F1DecidingFactor F1Input F2DecidingFactor F2Input        Pipe largePipe = new Pipe("large-pipe");        // Small Pipe 1 Fields :         // F1Join F1Result        Pipe rhsOne = new Pipe("small-pipe-1");        // New field to small pipe. Expected Fields:        // F1Join F1Result F1DecidingFactor_RHS        rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);        // Small Pipe 2 Fields :         // F2Join F2Result        Pipe rhsTwo = new Pipe("small-pipe-2");        // New field to small pipe. Expected Fields:        // F2Join F2Result F2DecidingFactor_RHS        rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);        // Joining first small pipe. Expected fields after join:         // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS        Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());        // Joining second small pipe. Expected fields after join:         // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS        Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);        result = new Discard(result, f1DecidingFactorRhs);        result = new Discard(result, f2DecidingFactorRhs);        // result Pipe should have expected result    }}

Option 2

If you want to have default value instead of null/blank, then I would suggest you do the HashJoin first with default Joiners followed by a function to update tuples with appropriate values. Something like:

Main Class

import cascading.pipe.Each;import cascading.pipe.HashJoin;import cascading.pipe.Pipe;import cascading.pipe.joiner.LeftJoin;import cascading.tuple.Fields;public class StackHashJoinTest {    public StackHashJoinTest() {        Fields f1Input = new Fields("F1Input");        Fields f2Input = new Fields("F2Input");        Fields f1Join = new Fields("F1Join");        Fields f2Join = new Fields("F2Join");        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");        // Large Pipe fields :         // F1DecidingFactor F1Input F2DecidingFactor F2Input        Pipe largePipe = new Pipe("large-pipe");        // Small Pipe 1 Fields :         // F1Join F1Result        Pipe rhsOne = new Pipe("small-pipe-1");        // Small Pipe 2 Fields :         // F2Join F2Result        Pipe rhsTwo = new Pipe("small-pipe-2");        // Joining first small pipe.         // Expected fields after join:         // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result        Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());        // Joining second small pipe.         // Expected fields after join:         // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result        Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);        // result Pipe should have expected result    }}

Update Function

import cascading.flow.FlowProcess;import cascading.operation.BaseOperation;import cascading.operation.Function;import cascading.operation.FunctionCall;import cascading.tuple.Fields;import cascading.tuple.TupleEntry;public class TestFunction extends BaseOperation<Void> implements Function<Void> {    private static final long serialVersionUID = 1L;    private static final String DECIDING_FACTOR = "No";    private static final String DEFAULT_VALUE = "N/A";    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"    public TestFunction() {        super(Fields.ARGS);    }    @Override    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {        TupleEntry arguments = call.getArguments();        TupleEntry result = new TupleEntry(arguments);        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {            result.setString("F1Output", DEFAULT_VALUE);        }        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {            result.setString("F2Output", DEFAULT_VALUE);        }        call.getOutputCollector().add(result);    }}

References

This should solve your problem. Let me know if this helps.