bugcy013
7/7/2014 - 3:11 AM

nfdumpexport.py

#!/usr/local/python2.7.2/bin/python
# $Id$
# Author:  <cmgreen@uab.edu>
# Purpose: Run nfdump for flow exports so searches happen in splunk for atleast the IP hit
# Created: Wed Aug 28 08:50:27 CDT 2013

import sys
import unittest
import argparse
import subprocess, datetime
import tempfile
import os

DEBUG = 0

class NfExport(object):
    def __init__(self, indir, outdir):
        self.INDIR = indir
        self.OUTDIR = outdir

    def nfdump_uniq(self, fout, file_param):
        nfdump = subprocess.Popen(('/usr/local/bin/nfdump', '-R', file_param, '-o', 'fmt:%da'),
                                  stdout=subprocess.PIPE)
        
        uniq = subprocess.Popen(('uniq',),
                                stdin=nfdump.stdout,
                                stdout=subprocess.PIPE)

        sort = subprocess.Popen(('sort','-u'),
                                stdin=uniq.stdout,
                                stdout=subprocess.PIPE)

        while sort.poll() is None:
            line = sort.stdout.readline()
            line.strip()
            print >>fout, line.strip()

    def run(self, start_date, force=False, end_date=None, span=240):
        for inpath, dt in self.input_walk(start_date, end_date, span):
            print inpath, dt
            outpath = os.path.join(self.OUTDIR, dt.strftime("flowip-%Y-%m-%d-%H-%M.txt"))
            if os.path.exists(outpath) and not force:
                continue

            fout = open(outpath, "w")
            self.nfdump_uniq(fout, inpath)
                
        

    def input_walk(self, start_date, end_date=None, span=240):

        """ Walk the upstream directories with the span set in
        minutes, anything less than 5 minutes is set to 5 minutes

        span defaults to 4 hours

        one day = (1440minutes)
        """

        # increment 
        increment = max(span / 5, 5)
        
        for root, dirs, files in os.walk(self.INDIR):                        
            if root and not dirs and files is not None:
                
                dirparts = root.split("/")
                year,month,day = int(dirparts[-3]),int(dirparts[-2]),int(dirparts[-1])
                dir_date = datetime.datetime(year=year,month=month,day=day)
                
                if dir_date  < start_date:
                    if DEBUG:
                        print >>sys.stderr, "skipping(too early)", root
                    continue

                if end_date and dir_date > end_date:
                    if DEBUG:
                        print >>sys.stderr, "skipping(too late)", root
                    continue
                    
                
                files.sort()
                # files are sorted, 5 minute increments
                for i in range(0,len(files) - increment,increment):
                    last_file = files[i+increment]

                    idx = last_file.find(".")
                    if idx < 0:
                        if DEBUG: 
                            print >>sys.stderr, "Cannot parse time from %s/%s" % (root,last_file)
                        continue
                    else:
                        dates = last_file[idx+1:]

                    t = datetime.datetime.strptime(dates, "%Y%m%d%H%M")
                        
                    path = root + "/" + files[i] +  ":" + last_file 
                    ret = (path, t)
                    yield ret
        
        
if __name__=='__main__':
    indir  = "/data2/nfsen/upstream1/upstream1"
    outdir = "/data2/nfexport/"

    nfe = NfExport(indir,outdir)
    # nfe.run(sys.stdout)
    then = datetime.datetime.now() - datetime.timedelta(days=30)
    start = datetime.datetime(year=then.year,month=then.month,day=then.day)
    nfe.run(start)