MichaelJMath
7/12/2017 - 10:18 PM

Create a pipeline instance in Quantopian and run in chunks to avoid memory overload. Include common imports for pipeline.

Create a pipeline instance in Quantopian and run in chunks to avoid memory overload. Include common imports for pipeline.

def make_pipeline():
    """Create a pipeline instance in Quantopian"""
    # Base Universe
    base_universe = Q500US()
    # base_universe = Fundamentals.symbol.latest.element_of(['GS', 'AAPL', 'JPM', 'XOM'])
    closed_end_funds = Fundamentals.share_class_description.latest.startswith('CE')
    
    universe = base_universe & ~closed_end_funds

    # Momentum Factor Example
    momo_1_mo = Returns(inputs=[USEquityPricing.close], 
                        window_length=22,
                        mask=universe)
    
    return Pipeline(columns={'momentum_1_mo': momo_1_mo,
                               'sector_code': Sector()
                              },
                    screen=universe)

start_date = '2017-01-04' 
end_date = '2017-01-05'

# Uncomment to run pipeline in one go
result = run_pipeline(make_pipeline(), start_date, end_date)  
# Typical imports for use with Pipeline
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline

# Datasets
from quantopian.pipeline.data.builtin import USEquityPricing

# New way: Fundamentals.my_field: 
from quantopian.pipeline.data import Fundamentals  

# Factors, Classifiers, and Filters
# New way for classifiers: classifiers.fundamentals 
from quantopian.pipeline.factors import AverageDollarVolume, Returns
from quantopian.pipeline.classifiers.fundamentals import Sector 
from quantopian.pipeline.filters import QTradableStocksUS, Q500US

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import alphalens as al
def run_pipe_in_chunks(pipe, start_date, end_date, 
                       weeks = 25, days = 0):
    """Run pipeline in chunks to avoid memory overload
    --------
    Inputs:
    --------
    pipe: pipeline object
    start_date: string, Timestamp, or datetime object
        a date string or datetime object representing the start of the period
    end_date: string, Timestamp, or datetime object
        a date string or datetime object representing the end of the period
    weeks, days: int
        number of weeks and/or days in each chunk
    
    -------
    Return
    --------
    a multi-index dataframe:
         Index=(Date, asset)
         Columns = pipeline columns
    """
    
    # Convert start and end_date to timestamp
    start_date = pd.Timestamp(start_date)
    end_date = pd.Timestamp(end_date)
    
    # Set inital chunk date endpoints
    start = start_date
    step = pd.Timedelta(weeks = weeks, days = days)
    end = start + step
    
    # Initialize a list named chunks to store the pipeline results
    chunks  = []
    while start < end_date:
        # Run pipeline and append to chunks
        print "Running Pipeline for %s to %s" % (start, end)
        result = run_pipeline(pipe, start_date= start, end_date= end)
        chunks.append(result)
        
        # Update start and end dates
        end = result.index.get_level_values(0)[-1].tz_localize(None)
        start = end + pd.Timedelta(days = 1)
        end = start + step
        
        # If end is after last day of period, set to last day of period
        if end > end_date:
            end = end_date
    
    try:
        final_result = pd.concat(chunks, axis = 0)
        print "Pipeline Computations Complete"
    except:
        print 'Concat Failed: Returned List of Dataframes instead of one Dataframe'
        final_result = chunks
        
    return final_result

start_date = '2003-01-01' 
end_date = '2012-01-01'

# Run Pipeline in chunks
result = run_pipe_in_chunks(make_pipeline(), 
                            start_date, 
                            end_date, 
                            weeks=26, days=0)
  
class MyCustomFactor(CustomFactor):
    """This example will create a custom 
    volatility-adjusted momentum factor"""
    # Set constants - These will not necessarily be 
    # in every custom factor
    LAG = 21
    MOMO_PERIOD = 252
    
    # Default inputs and window_length
    inputs = [USEquityPricing.close]
    window_length = MOMO_PERIOD + LAG + 1
  
    def compute(self, today, asset_ids, out, close):
        daily_returns = close[1:] / close[:-1] - 1
        std = np.nanstd(daily_returns[:self.MOMO_PERIOD ], ddof=1, axis=0)
        out[:] = (close[-self.LAG-1] / close[0] - 1) / std