Spring Data MongoDB Lookup with Pipeline Aggregation Spring Data MongoDB Lookup with Pipeline Aggregation spring spring

Spring Data MongoDB Lookup with Pipeline Aggregation


Building upon the info given by @dnickless, I was able to solve this. I'll post the complete solution in the hopes it helps someone else in the future.

I'm using mongodb-driver:3.6.4

First, I had to create a custom aggregation operation class so that I could pass in a custom JSON mongodb query to be used in the aggregation operation. This will allow me to use pipeline within a $lookup which is not supported with the driver version I am using.

public class CustomProjectAggregationOperation implements AggregationOperation {    private String jsonOperation;    public CustomProjectAggregationOperation(String jsonOperation) {        this.jsonOperation = jsonOperation;    }    @Override    public Document toDocument(AggregationOperationContext aggregationOperationContext) {        return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));    }}

Now that we have the ability to pass a custom JSON query into our mongodb spring implementation, all that is left is to plug those values into a TypedAggregation query.

public List<FulfillmentChannel> getFulfillmentChannels(    String SOME_VARIABLE_STRING_1,     String SOME_VARIABLE_STRING_2) {    AggregationOperation match = Aggregation.match(            Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));    AggregationOperation match2 = Aggregation.match(            Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));    String query =            "{ $lookup: { " +                    "from: 'deliveryZipCodeTiming'," +                    "let: { location_id: '$fulfillmentLocationId' }," +                    "pipeline: [{" +                    "$match: {$expr: {$and: [" +                    "{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +                    "{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +                    "{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +                    "as: 'deliveryZipCodeTimings'}}";    TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(            FulfillmentChannel.class,            match,            new CustomProjectAggregationOperation(query),            match2    );    AggregationResults<FulfillmentChannel> results =         mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);    return results.getMappedResults();}


The drivers are pretty much always a little bit behind the current language features that MongoDB provides - hence some of the latest and greatest features are simply not nicely accessible through the API yet. I am afraid this is one of those cases and you'll need to resort to using strings. Kind of like so (untested):

AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);


I would like to add this my solution which is repeating in some aspect the solutions posted before.

Mongo driver v3.x

For Mongo driver v3.x I came to the following solution:

import java.util.Collections;import java.util.List;import java.util.Map;import java.util.stream.Collectors;import com.mongodb.BasicDBList;import com.mongodb.BasicDBObject;import com.mongodb.util.JSON;import org.bson.Document;import org.springframework.data.mongodb.core.aggregation.AggregationOperation;import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;public class JsonOperation implements AggregationOperation {    private List<Document> documents;    public JsonOperation(String json) {        Object root = JSON.parse(json);        documents = root instanceof BasicDBObject                    ? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))                    : ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());    }    @Override    public Document toDocument(AggregationOperationContext context) {        // Not necessary to return anything as we override toPipelineStages():        return null;    }    @Override    public List<Document> toPipelineStages(AggregationOperationContext context) {        return documents;    }}

and then provided that aggregation steps are given in some resource aggregations.json:

[  {    $match: {      "userId": "..."    }  },  {    $lookup: {      let: {        ...      },      from: "another_collection",      pipeline: [        ...      ],      as: "things"    }  },  {    $sort: {      "date": 1    }  }]

one can use above class as follows:

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();

Mongo driver v4.x

As JSON class was removed from Mongo v4, I have rewritten the class as follows:

import java.util.Collections;import java.util.List;import org.bson.Document;import org.springframework.data.mongodb.core.aggregation.AggregationOperation;import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;public class JsonOperation implements AggregationOperation {    private List<Document> documents;    private static final String DUMMY_KEY = "dummy";    public JsonOperation(String json) {        documents = parseJson(json);    }    static final List<Document> parseJson(String json) {        return (json.startsWith("["))                    ? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)                    : Collections.singletonList(Document.parse(json));    }    @Override    public Document toDocument(AggregationOperationContext context) {        // Not necessary to return anything as we override toPipelineStages():        return null;    }    @Override    public List<Document> toPipelineStages(AggregationOperationContext context) {        return documents;    }    @Override    public String getOperator() {        return documents.iterator().next().keySet().iterator().next();    }}

but implementation is now a bit ugly because of string manipulations. If somebody has a better idea of how to parse array of objects in a more elegant way, please edit this post or drop a comment. Ideally there should be some method in Mongo core that allows to parse either JSON object or list (returns BasicDBObject/BasicDBList or Document/List<Document>).

Also note that I have skipped the step of transforming Document instances in toPipelineStages() method as it is not necessary in my case:

@Overridepublic List<Document> toPipelineStages(AggregationOperationContext context) {    return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());}