stevenbeales
12/28/2018 - 3:12 AM

Feature Loader

import pandas as pd
import logging
import pdb
from .features import class_map
from .features import officers_collate

log = logging.getLogger(__name__)


class FeatureLoader():

    def __init__(self, features, 
                       schema_name, 
                       blocks, 
                       labels_config, 
                       labels, 
                       labels_table, 
                       prediction_window, 
                       officer_past_activity_window,
                       timegated_feature_lookback_duration,
                       db_engine):
        '''
        Args:
            feature_blocks (dict): dictionary of feature blocks and list of features to use for the matrix
            schema_name (str) : name of the schema in the db where the features blocks tables are
            blocks (list): name of the feature blocks to use
            labels_config (dict): config file of the conditions for each label
            labels (dict): labels dictionary to use from the config file
            prediction_window (str) : prediction window to use for the label generation
            officer_past_activity_window (str): window for conditioning which officers to use given an as_of_date
        '''

        self.features = features
        self.schema_name = schema_name
        self.blocks = blocks
        self.labels_config = labels_config
        self.labels = labels
        self.labels_table = labels_table
        self.prediction_window = prediction_window
        self.officer_past_activity_window = officer_past_activity_window
        self.timegated_feature_lookback_duration = timegated_feature_lookback_duration
        self.db_engine = db_engine

        self.flatten_label_keys = [item for sublist in self.labels for item in sublist]

    def _block_tables_name(self, block_name):
        block_class = class_map.lookup_block(block_name,
                                     module=officers_collate,
                                     lookback_durations=self.timegated_feature_lookback_duration,
                                     n_cpus=1)

        list_prefix = [block_class.prefix_space_time_lookback,
                       block_class.prefix_sub,
                       block_class.prefix_agg,
                       block_class.prefix_space_time, 
                       block_class.prefix_post]
        
        return ['{prefix}_aggregation'.format(prefix=prefix) for prefix in list_prefix if prefix]       


    def features_list(self):
        return [feature for list_features in self.features_in_blocks().values() for feature in list_features]

    def features_in_blocks(self):
        
        features_in_blocks = {}
        features_missing = [] 
        for block in self.blocks:
            active_features = [key for key in self.features[block] if self.features[block][key] == True]
            block_tables = self._block_tables_name(block)
            for block_table in block_tables:
                if active_features:
                    query = (""" select * FROM public.get_active_block_features('{schema_name}',
                                                                                '{block_table}',
                                                                                 ARRAY{active_features},
                                                                                 ARRAY{timegated_feature_lookback_duration});"""
                             .format(schema_name=self.schema_name,
                                     block_table=block_table,
                                     active_features=active_features,
                                     timegated_feature_lookback_duration=self.timegated_feature_lookback_duration))
                    
                    result = self.db_engine.connect().execute(query)
                    result_dict = [dict(row) for row in result][0]
                    features_in_blocks[block_table] = result_dict['col_avaliable']
                    # keep going through the rest of features
                    active_features = result_dict['col_missing']

            if result_dict['col_missing']:
                features_missing += result_dict['col_missing']
        if not features_missing:
            log.debug('No features are missing')
        else:
            log.debug('These features are missing: {}'.format(features_missing))

        return features_in_blocks
         
    def _tree_conditions(self, nested_dict, parent=[], conditions=[]):
        '''
        Function that returns a list of conditions from the labels config file
        looping recursively through each tree
        Args:
            nested_dict (dict): dictionary for each of the keys in the labels_config file
            parent (list): use in the recursive function to append the parent to each tree
            conditions (list): use in the recursive mode to append all the conditions
        '''
        if isinstance(nested_dict, dict):
            column = nested_dict['COLUMN']
            for value in nested_dict['VALUES']:
                parent_temp = parent.copy()
                if isinstance(value, dict):
                    for key in value.keys():
                        parent_temp.append('{col}:{val}'.format(col=column, val=key))
                        self._tree_conditions(value[key], parent_temp, conditions)
                else:
                    parent_temp.append('{col}:{val}'.format(col=column, val=value))
                    conditions.append('{{{parent_temp}}}'.format(parent_temp=",".join(parent_temp)))
        return conditions

    def _get_event_type_columns(self, nested_dict, list_events=[]):
        if isinstance(nested_dict, dict):
            list_events.append(nested_dict['COLUMN'])
            for val in nested_dict['VALUES']:
                if isinstance(val, dict):
                    for key in val.keys():
                        self._get_event_type_columns(val[key], list_events)
        return list_events

    def get_query_labels(self, as_of_dates_to_use):
        '''
        '''

        # SUBQUERIES of arrays of conditions
        sub_query = []
        event_type_columns = set()
        for key in self.flatten_label_keys:
            condition = key.lower()
            list_conditions = self._tree_conditions(self.labels_config[key], parent=[], conditions=[])
            sub_query.append(" {condition}_table as "
                            "    ( SELECT  "
                            "          unnest(ARRAY{list_conditions}) as {condition}_condition )"
                            .format(condition=condition,
                                    list_conditions=list_conditions))
            # event type
            event_type_columns.update(self._get_event_type_columns(self.labels_config[key], []))

        # JOIN subqueries
        sub_queries = ", ".join(sub_query)
        sub_queries = ("WITH {sub_queries}, "
                       " all_conditions as "
                       "    (SELECT * "
                       "     FROM {cross_joins})"
                       .format(sub_queries=sub_queries,
                               cross_joins=" CROSS JOIN ".join([key.lower() + '_table' for key in self.flatten_label_keys])))

        # CREATE AND AND OR CONDITIONS
        and_conditions = []
        for and_labels in self.labels:
            or_conditions = []
            for label in and_labels:
                or_conditions.append("event_type_array::text[] @> {key}_condition::text[]".format(key=label.lower()))
            and_conditions.append(" OR ".join(or_conditions))
        conditions = " AND ".join('({and_condition})'.format(and_condition=and_condition) for and_condition in and_conditions)

        # QUERY OF AS OF DATES
        query_as_of_dates = (" as_of_dates as ( "
                            "select unnest(ARRAY{as_of_dates}::timestamp[]) as as_of_date) "
                            .format(as_of_dates=as_of_dates_to_use))

        # DATE FILTER
        query_filter = ("group_events as ( "
                        "SELECT officer_id,  "
                        "       event_id, "
                        "       array_agg(event_type::text ||':'|| value::text ORDER BY 1) as event_type_array, "
                        "       min(event_datetime) as min_date, "
                        "       max(event_datetime) filter (where event_type in "
                        "                          (SELECT unnest(ARRAY{event_types}))) as max_date "
                        "FROM features.{labels_table}  "
                        "GROUP BY officer_id, event_id  "
                        "), date_filter as ( "
                        " SELECT  officer_id, "
                        "        as_of_date, "
                        "        event_type_array "
                        " FROM group_events "
                        " JOIN  as_of_dates ON "
                        " min_date > as_of_date and max_date < as_of_date + INTERVAL '{prediction_window}') "
                        .format(event_types=list(event_type_columns),
                                labels_table=self.labels_table,
                                prediction_window=self.prediction_window))

        query_select_labels = (" labels as ( "
                               "  SELECT officer_id, "
                               "        as_of_date, "
                               "        1 as outcome "
                               " FROM date_filter "
                               " JOIN all_conditions ON "
                               "   {conditions} "
                               " GROUP by as_of_date, officer_id)"
                               .format(conditions=conditions))

        # CONCAT all parts of query
        query_labels = ("{sub_queries}, "
                        "{as_of_dates}, "
                        "{query_filter}, "
                        "{query_select}".format(sub_queries=sub_queries,
                                                as_of_dates=query_as_of_dates,
                                                query_filter=query_filter,
                                                query_select=query_select_labels))
        return query_labels

    def get_dataset(self, as_of_dates_to_use):
        features_in_blocks = self.features_in_blocks()
        # Read labels master 
        complete_df = self.get_master_labels(as_of_dates_to_use)

        #loop through every table in blocks
        for table_name, features in features_in_blocks.items():
            log.info('Joining table {}!'.format(table_name))
            features_coalesce = ", ".join(['coalesce("{0}",0) as {0}'.format(feature) for feature in features])
             
            # table with no date
            if 'ND' in table_name:
                query = ("""SELECT officer_id,
                                   {features_coalesce}
                            FROM {schema}."{table_name}"
                             WHERE officer_id is not null; """
                                            .format(features_coalesce=features_coalesce,
                                                                schema=self.schema_name,
                                                                table_name=table_name))                                    
            else:
                query = ("""SELECT officer_id,
                                   as_of_date::timestamp,
                                  {features_coalesce}
                            FROM {schema}."{table_name}"
                            WHERE as_of_date in (
                                SELECT unnest(ARRAY{as_of_dates}::DATE[]))
                             AND officer_id is not null
                                ;""".format(features_coalesce=features_coalesce,
                                                 schema=self.schema_name,
                                                 table_name=table_name,
                                                 as_of_dates=as_of_dates_to_use))
            # Get the data
            db_conn = self.db_engine.raw_connection()
            cur = db_conn.cursor(name='cursor_for_loading_matrix')
            cur.execute(query)
            table = cur.fetchall()

            # Get column names
            col_names = []
            for desc in cur.description:
                col_names.append(desc[0])  

            # To pandas df
            table = pd.DataFrame(table)
            table.columns = col_names
            db_conn.close()
            
            if 'ND' in table_name:
                 complete_df = complete_df.merge(table, on='officer_id', how='left')
            else:
                 complete_df = complete_df.merge(table, on=['officer_id','as_of_date'], how='left')

        #Set index
        complete_df = complete_df.set_index('officer_id')

        # Zero imputation
        complete_df.fillna(0, inplace=True)

        # labels at last
        cols = complete_df.columns.tolist()
        cols.insert(len(cols), cols.pop(cols.index('outcome')))
        complete_df = complete_df.reindex(columns=cols)  

        log.info('length of data_set: {}'.format(len(complete_df)))
        log.info('as of dates used: {}'.format(complete_df['as_of_date'].unique()))
        log.info('number of officers with adverse incident: {}'.format(complete_df['outcome'].sum() ))
        return complete_df

    def get_query_features(self):
        table_names = [x for block in self.blocks for x in  self._block_tables_name(block)]  

        #[item for sublist in self.labels for item in sublist]
        # seperate the tables by block that have a date column or not
        table_names_no_date = [x for x in table_names if 'ND' in x]
        table_names_with_date = [x for x in table_names if x not in set(table_names_no_date)]

        # convert features to string for querying while replacing NULL values with ceros in sql
        features_coalesce = ", ".join(['coalesce("{0}",0) as {0}'.format(feature) for feature in self.features_list()])

        query = ""
        if len(table_names) > 0:
            if table_names_with_date:
                query = (""" SELECT officer_id,
                                    as_of_date,
                                    {features_coalesce}
                              FROM {schema}."{block_table}" """.format(features_coalesce=features_coalesce,
                                                                       schema=self.schema_name,
                                                                       block_table= table_names_with_date[0]))
                if len(table_names_with_date) > 1:
                    table_names_with_date = table_names_with_date[1:]
                    for table_name in table_names_with_date:
                        query += (""" FULL OUTER JOIN {schema}."{block_table}" 
                                          USING (officer_id, as_of_date)""".format(schema=self.schema_name,
                                                                                    block_table= table_name))
   
            # check if in the first loop above a table was added
            if len(query) == 0:
                query = (""" SELECT officer_id,
                                    {features_coalesce}
                            FROM {schema}."{block_table}" """.format(features_coalesce=features_coalesce,
                                                                     schema=self.schema_name, 
                                                                     block_table=table_names_no_date[0]))
                if len(table_names_no_date) > 1:
                    table_names_no_date = table_names_no_date[1:]
                    for table_name in table_names_no_date:
                        query += (""" FULL OUTER JOIN {schema}."{block_table}" 
                                           USING (officer_id)""".format(schema=self.schema_name,
                                                                        block_table= table_name)) 
            # Filter by date
            query += """ WHERE as_of_date in ( SELECT as_of_date from as_of_dates) """
            subquery = """ features as ({query})""".format(query=query)

            return subquery
        
    def get_master_labels(self, as_of_dates_to_use):
        '''
        Returns master list of labels for specific as of dates
        '''

        # We only want to train and test on officers that have been active (any logged activity in events_hub)
        # NOTE: it uses the feature_labels created in query_labels         
        active_subquery = ( " officers AS (  "
                         "       SELECT officer_id "
                         "       FROM staging.officers_hub "
                         " ), active AS ( "
                         "       SELECT officer_id, as_of_date "
                         "       FROM as_of_dates as d "
                         "       CROSS JOIN officers as off, "
                         "           LATERAL "
                         "                (SELECT 1 "
                         "                 FROM staging.events_hub e "
                         "                 WHERE off.officer_id = e.officer_id "
                         "                 AND e.event_datetime + INTERVAL '{window}' > d.as_of_date "
                         "                 AND e.event_datetime <= d.as_of_date "
                         "                    LIMIT 1 ) sub_activity, "
                         "            LATERAL "
                         "                (SELECT 1 "
                         "                 FROM staging.officer_roles r "
                         "                 WHERE off.officer_id = r.officer_id "
                         "                 AND r.job_start_date <= d.as_of_date "
                         "                 AND sworn_flag = 1 "
                         "                 LIMIT 1) sub_sworn )"
                         .format(window=self.officer_past_activity_window))

        query_master_labels = (" {labels_subquery}, "
                               " {active_subquery} "
                               " SELECT officer_id, "
                               "        as_of_date, "
                               "        coalesce(outcome,0) as outcome "
                               " FROM active "
                               " LEFT JOIN labels "
                               " USING (as_of_date, officer_id) "
                               .format(labels_subquery=self.get_query_labels(as_of_dates_to_use),
                                       active_subquery=active_subquery))
        
        db_conn = self.db_engine.raw_connection()
        cur = db_conn.cursor(name='cursor_for_loading_matrix')
        cur.execute(query_master_labels)
        labels = cur.fetchall()

        # Get column names
        col_names = []
        for desc in cur.description:
            col_names.append(desc[0])

        # To pandas df
        labels = pd.DataFrame(labels)
        labels.columns = col_names
        db_conn.close()
        return labels

    def get_dataset_old(self, as_of_dates_to_use):
        '''
        This function returns dataset and labels to use for training / testing
        It is splitted in two queries:
            - features_subquery: which joins the features table with labels table
            - query_active: using the first table created in query_labels, and returns it only
                            for officers that are have any activity given the officer_past_activity_window

        '''
        labels = self.get_master_labels(as_of_dates_to_use)
        pdb.set_trace()
        features_list_string = ", ".join(['{}'.format(feature) for feature in self.features_list()])

        # JOIN FEATURES AND LABELS
        query_features_labels = (" {labels_subquery}, "
                                 " {features_subquery}, " 
                                 " features_labels AS ( "
                                 "    SELECT officer_id, "
                                 "           as_of_date, "
                                 "           {features_list}, "
                                 "           coalesce(outcome,0) as outcome "
                                 "    FROM features "
                                 "    LEFT JOIN labels "
                                 "    USING (as_of_date, officer_id)) "
                                 .format(labels_subquery=self.get_query_labels(as_of_dates_to_use),
                                         features_subquery=self.get_query_features(),
                                         features_list=features_list_string))

        # We only want to train and test on officers that have been active (any logged activity in events_hub)
        # NOTE: it uses the feature_labels created in query_labels
        query_active =  (""" SELECT officer_id, as_of_date, {features}, outcome """
                        """ FROM features_labels as f, """
                        """        LATERAL """
                        """          (SELECT 1 """
                        """           FROM staging.events_hub e """
                        """           WHERE f.officer_id = e.officer_id """
                        """           AND e.event_datetime + INTERVAL '{window}' > f.as_of_date """
                        """           AND e.event_datetime <= f.as_of_date """
                        """            LIMIT 1 ) sub; """
                        .format(features=features_list_string,
                                window=self.officer_past_activity_window))

        # join both queries together and load data
        query = (query_features_labels + query_active)

        # Get the data
        db_conn = self.db_engine.raw_connection()
        cur = db_conn.cursor(name='cursor_for_loading_matrix')
        cur.execute(query)
        matrix = cur.fetchall()

        # Get column names
        col_names = []
        for desc in cur.description:
            col_names.append(desc[0])  

        # To pandas df
        matrix_df = pd.DataFrame(matrix)
        matrix_df.columns = col_names
        db_conn.close()

        #all_data = pd.read_sql(query, con=db_conn)

        ## TODO: remove all zero value columns
        #all_data = all_data.loc[~(all_data[features_list]==0).all(axis=1)]

        log.info('length of data_set: {}'.format(len(matrix_df)))
        log.info('as of dates used: {}'.format(matrix_df['as_of_date'].unique()))
        log.info('number of officers with adverse incident: {}'.format(matrix_df['outcome'].sum() ))
        return matrix_df