Asynchronous transforms on streams in Highland.js Asynchronous transforms on streams in Highland.js mongoose mongoose

Asynchronous transforms on streams in Highland.js


I don't know much about Q or Highland. But this seems like a straightforward use case for the transform function on querystreams.

var stream = Sender.find({}).stream({ transform: manipulate })function manipulate(document) {    // do stuff here    return document;}stream.on("data", function(document) {    stream.pause()    document.save(function(error) {        // error handle, maybe stream.destroy([err]) if you want it to stop immediately        stream.resume();    });});stream.on("error", function(err){    //error handle});stream.on("close", function(){    console.log("hopefully this worked for you");});

The transform function will run on the document prior to emitting the 'data' event. Once the transform function has done its stuff, it's return value is sent to the 'data' function. Then you just pause/save/resume.


Instead of a Promise, you can use Highland's async function capabilities: http://highlandjs.org/#async. Mongoose also returns a Promise, so you could wrap that with Highland instead of the async function style but still avoid adding Q.

I would recommend using .flatMap() instead of .map() and .series() to flatten those streams back into one document stream. Then adding .done() can also be used to create a Thunk instead of using .resume() combined with the 'done' event listener.

Honestly, not 100% sure why your are having issues with the 'done' event being called.

var sender_stream, set_fields, save, sender_deferred;sender_stream = Sender.find({}).stream();save = function save(document) {    return _(function(push, next) {        document.save(function(err, result) {            push(err, document);            push(null, _.nil);        });    });};set_fields = function setFields(sender) {    // set some fields on sender...    return sender;};sender_deferred = Q.defer();_(sender_stream)    .map(setFields)    .flatMap(save)    .done(function() {        sender_deferred.resolve();    });