kenny-codes
6/10/2016 - 4:01 PM

Streaming from MongoDB to Elasticsearch

Streaming from MongoDB to Elasticsearch

// Uses Mongoose to stream a large collection out of mongoDB and into Elasticsearch
// Using the Elasticsearch.js library and bulk mode.. 
// Bulk request size must be large enough to avoid request timeout errors (depends on the ES cluster specs)

var mongoose = require('mongoose');

// Mongo connection, schema all configured in mongo.js
var mongo = require('./mongo');

var elasticsearch = require('elasticsearch');
var client = new elasticsearch.Client({
  host: 'localhost:9200',
  log: 'info'
});

var indexName = 'myIndex';
var typeName = 'doc';

var Model = mongoose.model('myModel');

var count = 0;
var startTime = new Date().getTime();
var stream = Model.find().lean().stream();

var bulk = [];
var sendAndEmptyQueue = function () {
    client.bulk({
        body: bulk
    }, function (err, resp) {
        if (err) {
            console.log(err);
        } else {
           console.log('Sent ' + bulk.length + ' documents to Elasticsearch!');
        }
    })

    bulk = [];
}

stream.on('data', function(doc) {
    count += 1;

    bulk.push({ index: { _index: indexName, _type: typeName, _id: doc._id }});
    bulk.push(doc);

    if (count % 500 == 1) {
        sendAndEmptyQueue();
    }
}).on('err', function(err) {
    console.log('MongoDB Stream Error: ' + err);
}).on('close', function() {
    sendAndEmptyQueue();
    console.log('Document Count is: ' + count);
    console.log('Duration is: ' + (new Date().getTime() - startTime));
});