Extract binary values from stream with low memory consumption Extract binary values from stream with low memory consumption express express

Extract binary values from stream with low memory consumption


I solved my own problem. I'm not 100% confident this is the best way to accomplish this, so I'm open to suggestions.

I made a subclass of stream.Transform and implemented the _transform method. I discovered that the next data chunk only gets input when the _transform callback is called. Knowing this, I stored that callback function as a property and only call it when I need the next chunk.

getBytes(size) is a method that will get a specified number of bytes from the current chunk (saved as a property as well) and call the earlier saved callback if the next chunk is needed. This is done recursively to account for varying sizes of chunks and varying number of requested bytes.

Then with a mix of async/await and promises, I was able to keep this entire process asynchronous (afaik) and backpressured.

const {Transform} = require('stream'),events = require('events');class ByteStream extends Transform{    constructor(options){        super(options);        this.event_emitter = new events.EventEmitter();        this.hasStarted = false;        this.hasEnded = false;        this.currentChunk;        this.nextCallback;        this.pos = 0;        this.on('finish', ()=>{            this.hasEnded = true;            this.event_emitter.emit('chunkGrabbed');        });    }    _transform(chunk, enc, callback){        this.pos = 0;        this.currentChunk = chunk;        this.nextCallback = callback;        if(!this.hasStarted){            this.hasStarted = true;            this.event_emitter.emit('started');        }        else{            this.event_emitter.emit('chunkGrabbed');        }    }    doNextCallback(){        return new Promise((resolve, reject) =>{            this.event_emitter.once('chunkGrabbed', ()=>{resolve();});            this.nextCallback();        });    }    async getBytes(size){        if(this.pos + size > this.currentChunk.length)        {            let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);            if(!this.hasEnded)            {                var newSize = size-(this.currentChunk.length - this.pos);                //grab next chunk                await this.doNextCallback();                if(!this.hasEnded){                    this.pos = 0;                    let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});                    bytes = Buffer.concat([bytes, recurseBytes]);                }            }            return bytes;        }        else{            let bytes = this.currentChunk.slice(this.pos, this.pos+size);            this.pos += size;            return bytes;        }    }}module.exports = {    ByteStream : ByteStream }

My express route is now:

apiRoute.route("/convert").post((req, res)=>{    let bStream = new ByteStream({});    let gStream = zlib.createGunzip();    bStream event_emitter.on('started', async () => {        console.log("started!");        let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});        console.log(myValue.length);    });    req    .pipe(gStream)    .pipe(bStream);});

By checking for an event started I can know when the first chunk was streamed into bStream. From there, it's just a matter of calling getBytes() with my desired byte count and then assigning the promised value to a variable. It does just what I need, although I haven't don't any rigorous testing yet.