hivefans
5/20/2015 - 3:08 PM

spark_log_extract.py

# coding: utf-8

# command example:
#   $ spark-submit spark_log_extract.py \
#   --name test \
#   --notblankkeys dn,stm,ev_ac,pg_url \
#   --filterregex ".*(=ac_pl\`|=ac_dl\`).*" \
#   --usegzip \
#   /path/to/source \
#   /path/to/atom \
#   dn,stm,ev_ac,v_title,v_uri,pg_url

import sys
import re
from pyspark import SparkContext, SparkConf
from optparse import OptionParser


def process(line, keys=[], not_blank_keys=[]):
    fields = line.split('`')
    output_lst = [''] * len(keys)
    try:
        for field in fields:
            key, val = field.split('=', 1)
            if key in keys:
                output_lst[keys.index(key)] = val
        for not_blank_key in not_blank_keys:
            if not output_lst[keys.index(not_blank_key)]:
                return
    except:
        return
    return '\t'.join(output_lst)


def parse_args():
    usage = "usage: %prog [options] input_path output_path keys"
    parser = OptionParser(usage=usage)
    parser.add_option(
        '--name', default='SparkLogExtract', help='name of Spark job')
    parser.add_option('--notblankkeys', default=[],
                      help='field keys must be not blank with "," delimiter')
    parser.add_option(
        '--filterregex', help='line filter regex', default='.*')
    parser.add_option('--usegzip', action='store_true',
                      help='output gzip or normal file', default=False)
    (options, args) = parser.parse_args()
    if len(args) != 3:
        parser.error('incorrect number of arguments')
    return options, args


def main():
    options, args = parse_args()
    keys = [k.strip() for k in args[2].split(',')]
    not_blank_keys = [k.strip() for k in options.notblankkeys.split(',')]
    conf = SparkConf().setAppName(options.name)
    if options.usegzip:
        conf.set(
            'spark.hadoop.mapreduce.output.fileoutputformat.compress', 'true')
        conf.set('spark.hadoop.mapreduce.output.fileoutputformat.compress.codec',
                 'org.apache.hadoop.io.compress.GzipCodec')
        conf.set(
            'spark.mapreduce.output.fileoutputformat.compress.type', 'BLOCK')
    sc = SparkContext(conf=(conf))
    sc.textFile(args[0]).filter(lambda x: re.match(options.filterregex, x)).map(
        lambda x: process(x, keys, not_blank_keys)).filter(lambda x: x).saveAsTextFile(args[1])

if __name__ == '__main__':
    main()