How to buffer MongoDB inserts during disconnect in node.js? How to buffer MongoDB inserts during disconnect in node.js? mongodb mongodb

How to buffer MongoDB inserts during disconnect in node.js?


Inserting 500K elements with insertOne() is a very bad idea. You should instead use bulk operations that allows you to insert many document in a single request. (here for example 10000, so it can be done in 50 single requests) To avoid buffering issue, you can manually handle it:

  1. Disable buffering with bufferMaxEntries: 0
  2. Set reconnect properties: reconnectTries: 30, reconnectInterval: 1000
  3. Create a bulkOperation and feed it with 10000 items
  4. Pause the xml reader. Try to insert the 10000 items. If it fails, retry every 3000ms until it succeed
  5. You may face some duplicate ID issues if the bulk operation is interrupted during execution, so ignore them (error code: 11000)

here is a sample script :

var fs = require('fs')var Xml = require('xml-stream')var MongoClient = require('mongodb').MongoClientvar url = 'mongodb://localhost:27017/test'MongoClient.connect(url, {  reconnectTries: 30,  reconnectInterval: 1000,  bufferMaxEntries: 0}, function (err, db) {  if (err != null) {    console.log('connect error: ' + err)  } else {    var collection = db.collection('product')    var bulk = collection.initializeUnorderedBulkOp()    var totalSize = 500001    var size = 0    var fileStream = fs.createReadStream('data.xml')    var xml = new Xml(fileStream)    xml.on('endElement: product', function (product) {      bulk.insert(product)      size++      // if we have enough product, save them using bulk insert      if (size % 10000 == 0) {        xml.pause()        bulk.execute(function (err, result) {          if (err == null) {            bulk = collection.initializeUnorderedBulkOp()            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')            xml.resume()          } else {            console.log('bulk insert failed: ' + err)            counter = 0            var retryInsert = setInterval(function () {              counter++              bulk.execute(function (err, result) {                if (err == null) {                  clearInterval(retryInsert)                  bulk = collection.initializeUnorderedBulkOp()                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')                  xml.resume()                } else if (err.code === 11000) { // ignore duplicate ID error                  clearInterval(retryInsert)                  bulk = collection.initializeUnorderedBulkOp()                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')                  xml.resume()                } else {                  console.log('failed after first try: ' + counter, 'error: ' + err)                }              })            }, 3000) // retry every 3000ms until success          }        })      } else if (size === totalSize) {        bulk.execute(function (err, result) {          if (err == null) {            db.close()          } else {            console.log('bulk insert failed: ' + err)          }        })      }    })  }})

sample log output:

doc 0 : 10000 saved on first trydoc 10000 : 20000 saved on first trydoc 20000 : 30000 saved on first try[...]bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdownfailed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0doc 130000 : 140000 saved after 4 triesdoc 140000 : 150000 saved on first try[...]


I don't know specifically about Mongodb driver and this buffer of entries. Maybe it only keeps data in specific scenarios.

So I will answer to this question with a more general approach that can work with any database.

To summarize, you have two problems:

  1. You are not recovering from failed attempts
  2. XML stream send data too fast

To handle the first issue, you need to implement a retry algorithm that will ensure that many attempts are made before giving up.

To handle the second issue, you need to implement back pressure on the xml stream. You can do that using the pause method, the resume method and an input buffer.

var Promise = require('bluebird');var fs = require('fs');var Xml = require('xml-stream');var fileStream = fs.createReadStream('myFile.xml');var xml = new Xml(fileStream);// simple exponential retry algorithm based on promisesfunction exponentialRetry(task, initialDelay, maxDelay, maxRetry) {    var delay = initialDelay;    var retry = 0;    var closure = function() {        return task().catch(function(error) {            retry++;            if (retry > maxRetry) {                throw error            }            var promise = Promise.delay(delay).then(closure);            delay = Math.min(delay * 2, maxDelay);            return promise;        })    };    return closure();}var maxPressure = 100;var currentPressure = 0;var suspended = false;var stopped = false;var buffer = [];// handle back pressure by storing incoming tasks in the buffer// pause the xml stream as soon as we have enough tasks to work on// resume it when the buffer is emptyfunction writeXmlDataWithBackPressure(product) {    // closure used to try to start a task    var tryStartTask = function() {        // if we have enough tasks running, pause the xml stream        if (!stopped && !suspended && currentPressure >= maxPressure) {            xml.pause();            suspended = true;            console.log("stream paused");        }        // if we have room to run tasks        if (currentPressure < maxPressure) {            // if we have a buffered task, start it            // if not, resume the xml stream            if (buffer.length > 0) {                buffer.shift()();            } else if (!stopped) {                try {                    xml.resume();                    suspended = false;                    console.log("stream resumed");                } catch (e) {                    // the only way to know if you've reached the end of the stream                    // xml.on('end') can be triggered BEFORE all handlers are called                    // probably a bug of xml-stream                    stopped = true;                    console.log("stream end");                }            }        }    };    // push the task to the buffer    buffer.push(function() {        currentPressure++;        // use exponential retry to ensure we will try this operation 100 times before giving up        exponentialRetry(function() {            return writeDataToDb(product)        }, 100, 2000, 100).finally(function() {            currentPressure--;            // a task has just finished, let's try to run a new one            tryStartTask();        });    });    // we've just buffered a task, let's try to run it    tryStartTask();}// write the product to database here :)function writeDataToDb(product) {    // the following code is here to create random delays and random failures (just for testing)    var timeToWrite = Math.random() * 100;    var failure = Math.random() > 0.5;    return Promise.delay(timeToWrite).then(function() {        if (failure) {            throw new Error();        }        return null;    })}xml.on('endElement: product', writeXmlDataWithBackPressure);

Play with it, put some console.log to understand how it behaves.I hope this will help you to solve your issue :)