scubamut
6/23/2019 - 11:11 AM

ZIPLINE PIPELINE STRATEGY WITH LARGE NO OF FACTORS

"""
This is a template algorithm on Zipline for you to adapt and fill in.
"""

from zipline.api import attach_pipeline, pipeline_output
from zipline import run_algorithm
from zipline.api import symbols, get_datetime, schedule_function
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import CustomFactor, Returns, DailyReturns
from zipline.pipeline.filters import StaticAssets
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from sklearn import preprocessing
from datetime import datetime
import pytz

WINDOW_LENGTH = 5
WIN_LIMIT = 0


# flag used for first WINDOW_LENGTH days, where the algo is "only" innitialising buffers. One can avoid that using a second pipeline, which is call only at initialization and compute the alphas for the entire window... But I have not yet found a good solution for this!

def preprocess(a):
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT, WIN_LIMIT])

    return preprocessing.scale(a)


def make_factor():
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True

        def compute(self, today, assets, out, open, close):
            p = (close - open) / close
            out[:] = preprocess(np.nansum(-p, axis=0))

    class mean_rev(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
        window_length = 30
        window_safe = True

        def compute(self, today, assets, out, high, low, close):
            p = (high + low + close) / 3

            m = len(close[0, :])
            n = len(close[:, 0])

            b = np.zeros(m)
            a = np.zeros(m)

            for k in range(10, n + 1):
                price_rel = np.nanmean(p[-k:, :], axis=0) / p[-1, :]
                wt = np.nansum(price_rel)
                b += wt * price_rel
                price_rel = 1.0 / price_rel
                wt = np.nansum(price_rel)
                a += wt * price_rel

            out[:] = preprocess(b - a)

    factors = {
        'Direction': Direction,
        'mean_rev': mean_rev
    }

    return factors


class Factor_N_Days_Ago(CustomFactor):

    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]


def initialize(context):
    """
    Called once at the start of the algorithm.
    """

    context.alphas = pd.DataFrame()

    # Rebalance every day, 1 hour after market open.
    algo.schedule_function(
        rebalance,
        algo.date_rules.every_day(),
        algo.time_rules.market_open(hours=1),
    )

    # Record tracking variables at the end of each day.
    algo.schedule_function(
        record_vars,
        algo.date_rules.every_day(),
        algo.time_rules.market_close(),
    )

    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(), 'pipeline')
    attach_pipeline(make_pipeinit(), 'pipeinit')

    context.first_trading_day = True
    context.factor_name_list = make_factor().keys()


def make_pipeinit():
    factors = make_factor()

    pipeline_columns = {}
    for f in factors.keys():
        for days_ago in reversed(range(WINDOW_LENGTH)):
            pipeline_columns[f + '-' + str(days_ago)] = Factor_N_Days_Ago([factors[f](mask=QTradableStocksUS())],
                                                                          window_length=days_ago + 1,
                                                                          mask=QTradableStocksUS())

    pipe = Pipeline(columns=pipeline_columns,
                    screen=QTradableStocksUS())

    return pipe


def make_pipeline():
    all_factors = make_factor()
    factors = {a: all_factors[a]() for a in all_factors}
    pipe = Pipeline(
        columns=factors,
        screen=QTradableStocksUS()
    )
    return pipe


def before_trading_start(context, data):
    if context.first_trading_day == True:
        df = (pipeline_output("pipeinit")).astype('float32')
        df = df.stack()
        df.index.names = ['stock', 'alphas']
        df = df.reset_index(level=['alphas', 'stock'])
        alphaname = np.empty(df['alphas'].values.size, dtype='object')
        dayaname = np.empty(df['alphas'].values.size, dtype='int')
        for i, a in enumerate(df['alphas'].values):
            pos = a.find('-')
            alphaname[i] = a[:pos]
            dayaname[i] = a[pos + 1:]

        df['factor'] = pd.Series(alphaname, index=df.index)
        df['day'] = pd.Series(dayaname, index=df.index)
        df = df.drop('alphas', axis=1)
        df = df.set_index(['stock', 'factor', 'day'])
        df = df[0]
        df = df.unstack(level=2)
        context.alphas = df

        context.first_trading_day = False
    else:
        df = (pipeline_output("pipeline")).astype('float32')
        df = df.stack().to_frame()
        df.index.names = ['stock', 'factor']
        context.alphas = context.alphas.drop([4], axis=1)
        context.alphas.columns = [1, 2, 3, 4]
        context.alphas = pd.concat([df, context.alphas], axis=1)


def rebalance(context, data):
    """
    Execute orders according to our schedule_function() timing.
    """
    pass


def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass


def handle_data(context, data):
    """
    Called every minute.
    """
    pass
    


if __name__ == "__main__":
    start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
    end = datetime(2013, 1, 10, 0, 0, 0, 0, pytz.utc)
    #     end = datetime.today().replace(tzinfo=timezone.utc)
    capital_base = 100000

    result = run_algorithm(start=start, end=end, initialize=initialize, \
                           capital_base=capital_base, \
                           before_trading_start=before_trading_start,
                           bundle='US_stock_bundle')

result[-3:]
# https://www.quantopian.com/posts/alpha-combination-via-clustering
# adapted for Zipline

"""
This is a template algorithm on Zipline for you to adapt and fill in.
"""

from zipline.api import attach_pipeline, pipeline_output
from zipline import run_algorithm
from zipline.api import symbols, get_datetime, schedule_function
from zipline.utils.events import date_rules, time_rules
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import CustomFactor, Returns, DailyReturns
from zipline.pipeline.filters import StaticAssets
import pandas as pd
import numpy as np
from scipy.stats.mstats import winsorize
from sklearn import preprocessing
from datetime import datetime
import pytz

WINDOW_LENGTH = 5
WIN_LIMIT = 0


# flag used for first WINDOW_LENGTH days, where the algo is "only" innitialising buffers. One can avoid that using a second pipeline, which is call only at initialization and compute the alphas for the entire window... But I have not yet found a good solution for this!

def preprocess(a):
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT, WIN_LIMIT])

    return preprocessing.scale(a)


def make_factor():
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True

        def compute(self, today, assets, out, open, close):
            p = (close - open) / close
            out[:] = preprocess(np.nansum(-p, axis=0))

    class mean_rev(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
        window_length = 30
        window_safe = True

        def compute(self, today, assets, out, high, low, close):
            p = (high + low + close) / 3

            m = len(close[0, :])
            n = len(close[:, 0])

            b = np.zeros(m)
            a = np.zeros(m)

            for k in range(10, n + 1):
                price_rel = np.nanmean(p[-k:, :], axis=0) / p[-1, :]
                wt = np.nansum(price_rel)
                b += wt * price_rel
                price_rel = 1.0 / price_rel
                wt = np.nansum(price_rel)
                a += wt * price_rel

            out[:] = preprocess(b - a)

    factors = {
        'Direction': Direction,
        'mean_rev': mean_rev
    }

    return factors


class Factor_N_Days_Ago(CustomFactor):

    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]


def initialize(context):
    """
    Called once at the start of the algorithm.
    """

    c = context

    c.etf_universe = StaticAssets(symbols('XLY', 'XLP', 'XLE', 'XLF', 'XLV',
                                          'XLI', 'XLB', 'XLK', 'XLU'))
    c.alphas = pd.DataFrame()

    # Rebalance every day, 1 hour after market open.
    schedule_function(
        rebalance,
        date_rules.every_day(),
        time_rules.market_open(hours=1),
    )

    # Record tracking variables at the end of each day.
    schedule_function(
        record_vars,
        date_rules.every_day(),
        time_rules.market_close(),
    )

    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(context), 'pipeline')
    attach_pipeline(make_pipeinit(context), 'pipeinit')

    c.first_trading_day = True
    c.factor_name_list = make_factor().keys()


def make_pipeinit(context):
    universe = context.etf_universe
    factors = make_factor()

    pipeline_columns = {}
    for f in factors.keys():
        for days_ago in reversed(range(WINDOW_LENGTH)):
            pipeline_columns[f + '-' + str(days_ago)] = Factor_N_Days_Ago([factors[f](mask=universe)],
                                                                          window_length=days_ago + 1,
                                                                          mask=universe)

    pipe = Pipeline(columns=pipeline_columns,
                    screen=universe)

    return pipe


def make_pipeline(context):
    universe = context.etf_universe
    all_factors = make_factor()
    factors = {a: all_factors[a]() for a in all_factors}
    pipe = Pipeline(
        columns=factors,
        screen=universe
    )
    return pipe


def before_trading_start(context, data):
    if context.first_trading_day == True:
        df = (pipeline_output("pipeinit")).astype('float32')
        df = df.stack()
        df.index.names = ['stock', 'alphas']
        df = df.reset_index(level=['alphas', 'stock'])
        alphaname = np.empty(df['alphas'].values.size, dtype='object')
        dayaname = np.empty(df['alphas'].values.size, dtype='int')
        for i, a in enumerate(df['alphas'].values):
            pos = a.find('-')
            alphaname[i] = a[:pos]
            dayaname[i] = a[pos + 1:]

        df['factor'] = pd.Series(alphaname, index=df.index)
        df['day'] = pd.Series(dayaname, index=df.index)
        df = df.drop('alphas', axis=1)
        df = df.set_index(['stock', 'factor', 'day'])
        df = df[0]
        df = df.unstack(level=2)
        context.alphas = df

        context.first_trading_day = False
    else:
        df = (pipeline_output("pipeline")).astype('float32')
        df = df.stack().to_frame()
        df.index.names = ['stock', 'factor']
        context.alphas = context.alphas.drop([4], axis=1)
        context.alphas.columns = [1, 2, 3, 4]
        context.alphas = pd.concat([df, context.alphas], axis=1)


def rebalance(context, data):
    """
    Execute orders according to our schedule_function() timing.
    """
    pass


def record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass


def handle_data(context, data):
    """
    Called every minute.
    """
    pass


if __name__ == "__main__":
    start = datetime(2013, 1, 1, 0, 0, 0, 0, pytz.utc)
    end = datetime(2013, 1, 10, 0, 0, 0, 0, pytz.utc)
    #     end = datetime.today().replace(tzinfo=timezone.utc)
    capital_base = 100000

    result = run_algorithm(start=start, end=end, initialize=initialize, \
                           capital_base=capital_base, \
                           before_trading_start=before_trading_start,
                           bundle='etfs_bundle')

    print(result[-3:])