pbojinov
1/20/2015 - 12:36 AM

Send CloudTrail events to CloudSearch with AWS Lambda

Send CloudTrail events to CloudSearch with AWS Lambda

console.log('Loading event');
var CLOUDSEARCH_ENDPOINT = < INSERT HERE >

var async = require('async');
var jpath = require('json-path')
var zlib = require('zlib');
var aws = require('aws-sdk');
var s3 = new aws.S3({
    apiVersion: '2006-03-01'
});
var csd = new aws.CloudSearchDomain({
    endpoint: CLOUDSEARCH_ENDPOINT,
    apiVersion: '2013-01-01'
});

//These mappings use json-path
//https://github.com/flitbit/json-path
MAPPING = {
    "aws_region": "#/awsRegion",
    "error_message": "#/errorMessage",
    "event_id": "#/eventID",
    "event_name": "#/eventName",
    "event_source": "#/eventSource",
    "event_time": "#/eventTime",
    "source_ip_address": "#/sourceIPAddress",
    "user_agent": "#/userAgent",
    "user_identity_type": "#/userIdentity/type",
    "user_identity_arn": "#/userIdentity/arn",
    "user_identity_account_id": "#/userIdentity/accountId",
    "user_identity_user_name": "#/userIdentity/userName",
}

//http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html
function create_cs_request(id, fields) {
    request = {};
    request['type'] = 'add';
    request['id'] = id;
    request['fields'] = fields
    return request;
}

function get_s3_gz_json(bucket, key, cb) {
    async.waterfall([
        //get json.gz
        function(callback) {
            s3.getObject({
                Bucket: bucket,
                Key: key
            }, function(err, data) {
                console.log("Finished collecting S3 Object");
                callback(err, data.Body);
            });
        },
        //gunzip the s3 object
        function(gz_json, callback) {
            zlib.gunzip(gz_json, function(err, dezipped) {
                var json_string = dezipped.toString('utf-8');
                var json = JSON.parse(json_string);
                callback(err, json);
            });
        },
        //get the records
        function(json, callback) {
            records = jpath.resolve(json, "#/Records[*]")
            console.log("Found the following records", records);
            callback(null, records);
        },
    ], function(err, result) {
        cb(err, result)
    });
}

function download_records(records, callback) {
    async.concat(records,
        function(item, cb) {
            fields = {};
            for (var prop in MAPPING) {
                ct_field_name = MAPPING[prop];
                ct_field_value = jpath.resolve(item, ct_field_name)[0] //jpath always returns a list!
                fields[prop] = ct_field_value;
            }
            cs_request = create_cs_request(fields["event_id"], fields);
            console.log("created request", cs_request);
            cb(null, cs_request);
        },
        function(err, record_requests) {
            callback(err, record_requests);
        });
};


function send_record_requests(requests, callback) {
    console.log("Publishing the following documents", requests);
    var params = {
        contentType: 'application/json',
        documents: JSON.stringify(requests)
    }
    csd.uploadDocuments(params, function(err, data) {
        callback(err);
    });
};

exports.handler = function(event, context) {
    console.log('Received event:');
    console.log(JSON.stringify(event, null, ' '));
    // Get the object from the event and show its content type
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    var perform_task = async.compose(send_record_requests, download_records, get_s3_gz_json);
    perform_task(bucket, key, function(err, result) {
        if (err) {
            context.done("Error performing task: " + err);
        } else {
            context.done(null, '');
        }
    });

};