manniru
7/27/2015 - 6:59 PM

Extract Gmail messages and insert into Elasticsearch

Extract Gmail messages and insert into Elasticsearch

var fs = require('fs'),
  readline = require('readline'),
  google = require('googleapis'),
  googleAuth = require('google-auth-library'),
  async = require('async'),
  addr = require('email-addresses'),
  elasticsearch = require('elasticsearch');

var client = new elasticsearch.Client({
  host: 'localhost:9200'
});

var weekday = new Array(7);
weekday[0] = "Sunday";
weekday[1] = "Monday";
weekday[2] = "Tuesday";
weekday[3] = "Wednesday";
weekday[4] = "Thursday";
weekday[5] = "Friday";
weekday[6] = "Saturday";

var SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'];
var TOKEN_DIR = (process.env.HOME || process.env.HOMEPATH ||
  process.env.USERPROFILE) + '/.credentials/';
var TOKEN_PATH = TOKEN_DIR + 'gmail-api-quickstart.json';

// Load client secrets from a local file.
fs.readFile('client_secret.json', function processClientSecrets(err, content) {
  if (err) {
    console.log('Error loading client secret file: ' + err);
    return;
  }
  // Authorize a client with the loaded credentials, then call the
  // Gmail API.
  authorize(JSON.parse(content), start);
});

/**
 * Create an OAuth2 client with the given credentials, and then execute the
 * given callback function.
 *
 * @param {Object} credentials The authorization client credentials.
 * @param {function} callback The callback to call with the authorized client.
 */
function authorize(credentials, callback) {
  var clientSecret = credentials.installed.client_secret;
  var clientId = credentials.installed.client_id;
  var redirectUrl = credentials.installed.redirect_uris[0];
  var auth = new googleAuth();
  var oauth2Client = new auth.OAuth2(clientId, clientSecret, redirectUrl);

  // Check if we have previously stored a token.
  fs.readFile(TOKEN_PATH, function (err, token) {
    if (err) {
      getNewToken(oauth2Client, callback);
    } else {
      oauth2Client.credentials = JSON.parse(token);
      callback(oauth2Client);
    }
  });
}

/**
 * Get and store new token after prompting for user authorization, and then
 * execute the given callback with the authorized OAuth2 client.
 *
 * @param {google.auth.OAuth2} oauth2Client The OAuth2 client to get token for.
 * @param {getEventsCallback} callback The callback to call with the authorized
 *     client.
 */
function getNewToken(oauth2Client, callback) {
  var authUrl = oauth2Client.generateAuthUrl({
    access_type: 'offline',
    scope: SCOPES
  });
  console.log('Authorize this app by visiting this url: ', authUrl);
  var rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout
  });
  rl.question('Enter the code from that page here: ', function (code) {
    rl.close();
    oauth2Client.getToken(code, function (err, token) {
      if (err) {
        console.log('Error while trying to retrieve access token', err);
        return;
      }
      oauth2Client.credentials = token;
      storeToken(token);
      callback(oauth2Client);
    });
  });
}

/**
 * Store token to disk be used in later program executions.
 *
 * @param {Object} token The token to store to disk.
 */
function storeToken(token) {
  try {
    fs.mkdirSync(TOKEN_DIR);
  } catch (err) {
    if (err.code != 'EEXIST') {
      throw err;
    }
  }
  fs.writeFile(TOKEN_PATH, JSON.stringify(token));
  console.log('Token stored to ' + TOKEN_PATH);
}

/**
 * Lists the labels in the user's account.
 *
 * @param {google.auth.OAuth2} auth An authorized OAuth2 client.
 */
function listMessages(auth, pageToken, cb) {
  var gmail = google.gmail('v1');
  gmail.users.messages.list({
    auth: auth,
    userId: 'me',
    maxResults: 100,
    pageToken: pageToken
  }, function (err, response) {
    if (err) {
      console.log('The API returned an error: ' + err);
      return cb(err);
    }

    var messages = response.messages;
    var nextToken = response.nextPageToken;
    if (messages.length == 0) {
      console.log('No messages found.');
    } else {
      return cb(null, messages, nextToken);
    }
  });
}

function getEmail(auth, id, cb) {
  var gmail = google.gmail('v1');
  gmail.users.messages.get({
    auth: auth,
    userId: 'me',
    id: id
  }, function (err, response) {
    if (err) {
      console.log('The API returned an error: ' + err);
      return cb(err);
    }
    return cb(null, response.payload.headers);
  });
}

function start(auth, pageToken) {
  console.log('Starting', pageToken);
  async.waterfall([function (callback) {
    listMessages(auth, pageToken, function (err, messages, nextToken) {
      if (err) return callback(err);
      callback(null, messages, nextToken);
    });
  }, function (messages, nextToken, callback) {
    var counter = 0;
    var arr = [];
    messages.forEach(function (m) {
      var from = [];
      var to = [];
      var address;
      var subject;
      var date;
      var spf = false;
      var dkim = false;
      var dmarc = false;
      getEmail(auth, m.id, function (err, headers) {
        console.log('Processing: ', m.id);
        counter++;
        if (err) return callback(err);
        if (headers && headers.length > 0) {
          headers.forEach(function (h) {
            if (h.name === 'To' && h.value.indexOf('@') > -1) {
              address = addr.parseAddressList(h.value);
              if (address && address.length > 0)
                address.forEach(function (t) {
                  to.push(t.address);
                });
            }
            else if (h.name === 'From') {
              address = addr.parseAddressList(h.value);
              if (address && address.length > 0)
                address.forEach(function (f) {
                  from.push(f.address);
                });
            }
            else if (h.name === 'Subject') {
              subject = h.value;
            }
            else if (h.name === 'Date') {
              try {
                date = new Date(h.value).toISOString();
              }
              catch (x) {
                console.log('Could not translate date', x, date);
              }
            }
            else if (h.name === 'Authentication-Results') {
              if (h.value.indexOf('spf=pass') > -1)
                spf = true;
              if (h.value.indexOf('dkim=pass') > -1)
                dkim = true;
              if (h.value.indexOf('dmarc=pass') > -1)
                dmarc = true;
            }
          });
          arr.push({
            id: m.id,
            from: from,
            to: to,
            subject: subject,
            timestamp: date,
            spf: spf,
            dkim: dkim,
            dmarc: dmarc
          });
          if (counter >= messages.length)
            callback(null, arr, nextToken);
        }
        else
          callback(null, arr, nextToken);
      });
    });
  }], function (err, arr, nextToken) {
    pushToElastic(arr);
    start(auth, nextToken);
  });
}

function pushToElastic(arr) {
  var final = [];
  arr.forEach(function (a) {
    final.push({index: {_index: 'gmail', _type: 'message', _id: a.id}});
    final.push({
      timestamp: a.timestamp,
      hourOfDay: new Date(a.timestamp).getHours(),
      dayOfWeek: weekday[new Date(a.timestamp).getDay()],
      subject: a.subject,
      from: a.from,
      to: a.to,
      dmarc: a.dmarc,
      spf: a.spf,
      dkim: a.dkim
    })
  });
  client.bulk({
    body: final
  }, function (err, resp) {
    if (err) console.log(err);
  });
}
[
  {
    "_id": "Gmail-Dashboard",
    "_type": "dashboard",
    "_source": {
      "title": "Gmail Dashboard",
      "hits": 0,
      "description": "",
      "panelsJSON": "[{\"col\":1,\"id\":\"Top-10-Worst-senders\",\"row\":4,\"size_x\":2,\"size_y\":5,\"type\":\"visualization\"},{\"col\":3,\"id\":\"Emails-Date-Histogram\",\"row\":1,\"size_x\":10,\"size_y\":3,\"type\":\"visualization\"},{\"col\":3,\"id\":\"Top-10-Senders\",\"row\":4,\"size_x\":2,\"size_y\":5,\"type\":\"visualization\"},{\"col\":1,\"id\":\"Total-Messages\",\"row\":1,\"size_x\":2,\"size_y\":3,\"type\":\"visualization\"},{\"col\":10,\"id\":\"Day-Of-Week\",\"row\":4,\"size_x\":3,\"size_y\":5,\"type\":\"visualization\"},{\"id\":\"Hour-Of-Day\",\"type\":\"visualization\",\"size_x\":5,\"size_y\":5,\"col\":5,\"row\":4}]",
      "version": 1,
      "timeRestore": false,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}}}]}"
      }
    }
  },
  {
    "_id": "Top-10-Senders",
    "_type": "visualization",
    "_source": {
      "title": "Top 10 Senders",
      "visState": "{\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"from.raw\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Total-Messages",
    "_type": "visualization",
    "_source": {
      "title": "Total Messages",
      "visState": "{\"type\":\"metric\",\"params\":{\"fontSize\":60},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Top-10-Worst-senders",
    "_type": "visualization",
    "_source": {
      "title": "Top 10 Worst senders",
      "visState": "{\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"from.raw\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"dmarc: false AND spf: false AND dkim: false\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Emails-Date-Histogram",
    "_type": "visualization",
    "_source": {
      "title": "Emails Date Histogram",
      "visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Day-Of-Week",
    "_type": "visualization",
    "_source": {
      "title": "Day Of Week",
      "visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"dayOfWeek\",\"size\":7,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Hour-Of-Day",
    "_type": "visualization",
    "_source": {
      "title": "Hour Of Day",
      "visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"hourOfDay\",\"size\":24,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  }
]