naviat
12/7/2017 - 7:59 AM

A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3. A full

A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3. A full writeup can be found on my site http://mlapida.com/thoughts/exporting-cloudwatch-logs-to-s3-lambda

import boto3
import logging
import json
import gzip
from StringIO import StringIO

logger = logging.getLogger()
logger.setLevel(logging.INFO)

client = boto3.client('firehose')

def lambda_handler(event, context):
    
    #capture the CloudWatch log data
    outEvent = str(event['awslogs']['data'])
    
    #decode and unzip the log data
    outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
    
    #convert the log data from JSON into a dictionary
    cleanEvent = json.loads(outEvent)
    
    #initiate a list
    s = []
    
    #set the name of the Kinesis Firehose Stream
    firehoseName = 'FlowLogTest'
    
    #loop through the events line by line
    for t in cleanEvent['logEvents']:
        
        #Transform the data and store it in the "Data" field. 
        p={
            #Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus]
            'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n"
        }
        
        #write the data to our list
        s.insert(len(s),p)
        
        #limit of 500 records per batch. Break it up if you have to.
        if len(s) > 499:
            #send the response to Firehose in bulk
            SendToFireHose(firehoseName, s)
            
            #Empty the list
            s = []
    
    #when done, send the response to Firehose in bulk
    if len(s) > 0:
        SendToFireHose(firehoseName, s)

#function to send record to Kinesis Firehose
def SendToFireHose(streamName, records):
    response = client.put_record_batch(
        DeliveryStreamName = streamName,
        Records=records
    )
    
    #log the number of data points written to Kinesis
    print "Wrote the following records to Firehose: " + str(len(records))