How can I mapreduce a object with complex subdocuments that relate to each other How can I mapreduce a object with complex subdocuments that relate to each other hadoop hadoop

How can I mapreduce a object with complex subdocuments that relate to each other


The following approach uses the aggregation framework to come up with a solution that is closer to the desired output. This is dependant on a third collection which can be seen as a merge between the two collections survey and trackings.

First and foremost, suppose you have the following collections with the test documents based on the example in your question:

// survey collectiondb.survey.insert({    _id: 1111,    name: "name",    questions: [        {_id: 1, text: "a,b, or c?", type: "multipleChoice", options: ["a", "b", "c",]},        {_id: 2, text: "what do you think", type: "freeform"}    ],    participants: [{_id: 1, name: "user 1"}, {_id: 2, name: "user 2"}],    results: [{_id: 123, userId: 1, questionId: 1, answer: "a"},        {_id: 124, userId: 2, questionId: 1, answer: "b"},        {_id: 125, userId: 1, questionId: 2, answer: "this is some answer"},        {_id: 126, userId: 2, questionId: 2, answer: "this is another answer"}]})// trackings collectiondb.trackings.insert([    {        _id:1,        surveyId: 1111,        userId: 1,        starttime: "2015-05-13 10:46:20.347Z",        endtime: "2015-05-13 10:59:20.347Z"    },    {        _id:2,        surveyId: 1111,        userId: 2,        starttime: "2015-05-13 10:13:06.176Z",        endtime: "2015-05-13 10:46:28.176Z"    }    ])

To create the third collection (lets call it output_collection), you would need to iterate over the trackings collection using the find() cursor's forEach() method, convert the fields with the date strings to actual ISODate objects, create an array field that stores the survey result and then save the merged object into the third collection. The following demonstrates this operation:

db.trackings.find().forEach(function(doc){    var survey = db.survey.find({"_id": doc.surveyId}).toArray();    doc.survey = survey;    doc["starttime"] = ISODate(doc.starttime);    doc["endtime"] = ISODate(doc.endtime);    db.output_collection.save(doc);});

After merging the two collections into output_collection, querying it with db.output_collection.findOne() will yield:

{    "_id" : 1,    "surveyId" : 1111,    "userId" : 1,    "starttime" : ISODate("2015-05-13T10:46:20.347Z"),    "endtime" : ISODate("2015-05-13T10:59:20.347Z"),    "survey" : [         {            "_id" : 1111,            "name" : "name",            "questions" : [                 {                    "_id" : 1,                    "text" : "a,b, or c?",                    "type" : "multipleChoice",                    "options" : [                         "a",                         "b",                         "c"                    ]                },                 {                    "_id" : 2,                    "text" : "what do you think",                    "type" : "freeform"                }            ],            "participants" : [                 {                    "_id" : 1,                    "name" : "user 1"                },                 {                    "_id" : 2,                    "name" : "user 2"                }            ],            "results" : [                 {                    "_id" : 123,                    "userId" : 1,                    "questionId" : 1,                    "answer" : "a"                },                 {                    "_id" : 124,                    "userId" : 2,                    "questionId" : 1,                    "answer" : "b"                },                 {                    "_id" : 125,                    "userId" : 1,                    "questionId" : 2,                    "answer" : "this is some answer"                },                 {                    "_id" : 126,                    "userId" : 2,                    "questionId" : 2,                    "answer" : "this is another answer"                }            ]        }    ]}

You can then apply the aggregation on this collection. The aggregation pipeline should consist of four $unwind** operator stages which deconstruct the arrays from the input documents to output a document for each element. Each output document replaces the array with an element value.

The next $project operator stage reshapes each document in the stream, such as by adding a new field duration which calculates the time difference in minutes between the starttime and endtime date fields, and uses the Arithmetic Operators to do the calculation.

After this is the $group operator pipeline stage which groups input documents by the "survey" key and applies the accumulator expression(s) to each group. Consumes all input documents and outputs one document per each distinct group.

So your aggregation pipeline should be something like this:

db.output_collection.aggregate([    { "$unwind": "$survey" },    { "$unwind": "$survey.questions" },    { "$unwind": "$survey.participants" },    { "$unwind": "$survey.results" },    {        "$project": {            "survey": 1,            "surveyId": 1,            "userId": 1,            "starttime": 1,            "endtime": 1,            "duration": {                "$divide": [                    { "$subtract": [ "$endtime", "$starttime" ] },                    1000 * 60                ]            }        }    },    {        "$group": {            "_id": "$surveyId",            "survey": { "$first": "$survey.name"},            "totalAverageTime": {                "$avg": "$duration"            },            "fastestTime": {                "$min": "$duration"            },            "slowestTime": {                "$max": "$duration"            },            "questions": {                "$addToSet": "$survey.questions"            },            "answers": {                "$addToSet": "$survey.results"            }        }    },    {        "$out": "survey_results"    }])

db.survey_results.find() Output

/* 0 */{    "result" : [         {            "_id" : 1111,            "survey" : "name",            "totalAverageTime" : 23.18333333333334,            "fastestTime" : 13,            "slowestTime" : 33.36666666666667,            "questions" : [                 {                    "_id" : 2,                    "text" : "what do you think",                    "type" : "freeform"                },                 {                    "_id" : 1,                    "text" : "a,b, or c?",                    "type" : "multipleChoice",                    "options" : [                         "a",                         "b",                         "c"                    ]                }            ],            "answers" : [                 {                    "_id" : 126,                    "userId" : 2,                    "questionId" : 2,                    "answer" : "this is another answer"                },                 {                    "_id" : 124,                    "userId" : 2,                    "questionId" : 1,                    "answer" : "b"                },                 {                    "_id" : 125,                    "userId" : 1,                    "questionId" : 2,                    "answer" : "this is some answer"                },                 {                    "_id" : 123,                    "userId" : 1,                    "questionId" : 1,                    "answer" : "a"                }            ]        }    ],    "ok" : 1}

UPDATE

Upon getting the aggregation output to another collection, say survey_results via the $out aggregation pipeline, you could then apply some native JavaScript functions together with the find() cursor's forEach() method to get the final object:

db.survey_results.find().forEach(function(doc){    var questions = [];    doc.questions.forEach(function(q){       var answers = [];       doc.answers.forEach(function(a){            if(a.questionId === q._id){                delete a.questionId;                answers.push(a);            }       });       q.answers = answers;       questions.push(q);    });           delete doc.answers;            doc.questions = questions;    db.survey_results.save(doc);});

Output:

/* 0 */{    "_id" : 1111,    "survey" : "name",    "totalAverageTime" : 23.18333333333334,    "fastestTime" : 13,    "slowestTime" : 33.36666666666667,    "questions" : [         {            "_id" : 2,            "text" : "what do you think",            "type" : "freeform",            "answers" : [                 {                    "_id" : 126,                    "userId" : 2,                    "answer" : "this is another answer"                },                 {                    "_id" : 125,                    "userId" : 1,                    "answer" : "this is some answer"                }            ]        },         {            "_id" : 1,            "text" : "a,b, or c?",            "type" : "multipleChoice",            "options" : [                 "a",                 "b",                 "c"            ],            "answers" : [                 {                    "_id" : 124,                    "userId" : 2,                    "answer" : "b"                },                 {                    "_id" : 123,                    "userId" : 1,                    "answer" : "a"                }            ]        }    ]}