Create a pipeline instance in Quantopian and run in chunks to avoid memory overload. Include common imports for pipeline.
def make_pipeline():
"""Create a pipeline instance in Quantopian"""
# Base Universe
base_universe = Q500US()
# base_universe = Fundamentals.symbol.latest.element_of(['GS', 'AAPL', 'JPM', 'XOM'])
closed_end_funds = Fundamentals.share_class_description.latest.startswith('CE')
universe = base_universe & ~closed_end_funds
# Momentum Factor Example
momo_1_mo = Returns(inputs=[USEquityPricing.close],
window_length=22,
mask=universe)
return Pipeline(columns={'momentum_1_mo': momo_1_mo,
'sector_code': Sector()
},
screen=universe)
start_date = '2017-01-04'
end_date = '2017-01-05'
# Uncomment to run pipeline in one go
result = run_pipeline(make_pipeline(), start_date, end_date)
# Typical imports for use with Pipeline
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.research import run_pipeline
# Datasets
from quantopian.pipeline.data.builtin import USEquityPricing
# New way: Fundamentals.my_field:
from quantopian.pipeline.data import Fundamentals
# Factors, Classifiers, and Filters
# New way for classifiers: classifiers.fundamentals
from quantopian.pipeline.factors import AverageDollarVolume, Returns
from quantopian.pipeline.classifiers.fundamentals import Sector
from quantopian.pipeline.filters import QTradableStocksUS, Q500US
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import alphalens as al
def run_pipe_in_chunks(pipe, start_date, end_date,
weeks = 25, days = 0):
"""Run pipeline in chunks to avoid memory overload
--------
Inputs:
--------
pipe: pipeline object
start_date: string, Timestamp, or datetime object
a date string or datetime object representing the start of the period
end_date: string, Timestamp, or datetime object
a date string or datetime object representing the end of the period
weeks, days: int
number of weeks and/or days in each chunk
-------
Return
--------
a multi-index dataframe:
Index=(Date, asset)
Columns = pipeline columns
"""
# Convert start and end_date to timestamp
start_date = pd.Timestamp(start_date)
end_date = pd.Timestamp(end_date)
# Set inital chunk date endpoints
start = start_date
step = pd.Timedelta(weeks = weeks, days = days)
end = start + step
# Initialize a list named chunks to store the pipeline results
chunks = []
while start < end_date:
# Run pipeline and append to chunks
print "Running Pipeline for %s to %s" % (start, end)
result = run_pipeline(pipe, start_date= start, end_date= end)
chunks.append(result)
# Update start and end dates
end = result.index.get_level_values(0)[-1].tz_localize(None)
start = end + pd.Timedelta(days = 1)
end = start + step
# If end is after last day of period, set to last day of period
if end > end_date:
end = end_date
try:
final_result = pd.concat(chunks, axis = 0)
print "Pipeline Computations Complete"
except:
print 'Concat Failed: Returned List of Dataframes instead of one Dataframe'
final_result = chunks
return final_result
start_date = '2003-01-01'
end_date = '2012-01-01'
# Run Pipeline in chunks
result = run_pipe_in_chunks(make_pipeline(),
start_date,
end_date,
weeks=26, days=0)
class MyCustomFactor(CustomFactor):
"""This example will create a custom
volatility-adjusted momentum factor"""
# Set constants - These will not necessarily be
# in every custom factor
LAG = 21
MOMO_PERIOD = 252
# Default inputs and window_length
inputs = [USEquityPricing.close]
window_length = MOMO_PERIOD + LAG + 1
def compute(self, today, asset_ids, out, close):
daily_returns = close[1:] / close[:-1] - 1
std = np.nanstd(daily_returns[:self.MOMO_PERIOD ], ddof=1, axis=0)
out[:] = (close[-self.LAG-1] / close[0] - 1) / std