scubamut
1/30/2019 - 1:51 PM

🟣 Register Bundle

import pandas_market_calendars as mcal
import datetime as dt

from zipline.data.bundles import register
from zipline.data.bundles.csvdir import csvdir_equities
 
register(
    'etfs_bundle',
    csvdir_equities(
        ['etfs'],
        '/home/scubamut/MEGAsync/10_DATA/daily',
    ),
    calendar_name = mcal.get_calendar('NYSE'), # US equities
#     start_session=start_session,
#     end_session=end_session
)

register(
    'funds_bundle',
    csvdir_equities(
        ['funds'],
        '/home/scubamut/MEGAsync/10_DATA/daily',
    ),
    calendar_name = mcal.get_calendar('NYSE'), # US equities
#     start_session=start_session,
#     end_session=end_session
)

register(
    'US_stock_bundle',
    csvdir_equities(
        ['US_stock_data'],
        '/home/scubamut/MEGAsync/10_DATA/daily',
    ),
    calendar_name = mcal.get_calendar('NYSE'), # US equities
#     start_session=start_session,
#     end_session=end_session
)
#########################################################################
# US_stock_bundle
###############

# ~.zipline/extensions.py

#from zipline.data.bundles import register, US_stock_data
#
#register('US_stock_bundle', US_stock_data.US_stock_data, calendar_name='NYSE')

#########################################################################


import pandas as pd
from os import listdir

# Change the path to where you have your data
try:
  import google.colab
  # for COLAB
  path = '/content/data/daily/US_stock_data/'
except:
  path = '/home/scubamut1/MEGAsync/10_DATA/daily/US_stock_data/'

"""
The ingest function needs to have this exact signature,
meaning these arguments passed, as shown below.
"""


def US_stock_data(environ,
                      asset_db_writer,
                      minute_bar_writer,
                      daily_bar_writer,
                      adjustment_writer,
                      calendar,
                      start_session,
                      end_session,
                      cache,
                      show_progress,
                      output_dir):
    # Get list of files from path
    # Slicing off the last part
    # 'example.csv'[:-4] = 'example'
    symbols = [f[:-4] for f in listdir(path)]

    if not symbols:
        raise ValueError("No symbols found in folder.")

    # Prepare an empty DataFrame for dividends
    divs = pd.DataFrame(columns=['sid',
                                 'amount',
                                 'ex_date',
                                 'record_date',
                                 'declared_date',
                                 'pay_date']
                        )

    # Prepare an empty DataFrame for splits
    splits = pd.DataFrame(columns=['sid',
                                   'ratio',
                                   'effective_date']
                          )

    # Prepare an empty DataFrame for metadata
    metadata = pd.DataFrame(columns=('start_date',
                                     'end_date',
                                     'auto_close_date',
                                     'symbol',
                                     'exchange'
                                     )
                            )

    # Check valid trading dates, according to the selected exchange calendar
    sessions = calendar.sessions_in_range(start_session, end_session)

    # Get data for all stocks and write to Zipline
    daily_bar_writer.write(
        process_stocks(symbols, sessions, metadata, divs)
    )

    # Write the metadata
#    asset_db_writer.write(equities=metadata)
#####################################################################################
# from https://github.com/quantopian/zipline/issues/2517

    exchange = {'exchange': 'NYSE', 'canonical_name': 'NYSE', 'country_code': 'US'}
    exchange_df = pd.DataFrame(
        data=[["NYSE", "NYSE", "US"]],
        columns=["exchange", "canonical_name", "country_code"],
    )
    asset_db_writer.write(equities=metadata,exchanges=exchange_df)    
#######################################################################################   

    # Write splits and dividends
    adjustment_writer.write(splits=splits,
                            dividends=divs)


"""
Generator function to iterate stocks,
build historical data, metadata 
and dividend data
"""


def process_stocks(symbols, sessions, metadata, divs):
    # Loop the stocks, setting a unique Security ID (SID)
    for sid, symbol in enumerate(symbols):

        print('Loading {}...'.format(symbol))
        # Read the stock data from csv file.
        df = pd.read_csv('{}/{}.csv'.format(path, symbol), index_col=[0], parse_dates=[0])

        # Check first and last date.
        start_date = df.index[0]
        end_date = df.index[-1]

        # Synch to the official exchange calendar
        df = df.reindex(sessions.tz_localize(None))[start_date:end_date]

        # Forward fill missing data
        df.fillna(method='ffill', inplace=True)

        # Drop remaining NaN
        df.dropna(inplace=True)

        # The auto_close date is the day after the last trade.
        ac_date = end_date + pd.Timedelta(days=1)

        # Add a row to the metadata DataFrame. Don't forget to add an exchange field.
        metadata.loc[sid] = start_date, end_date, ac_date, symbol, "NYSE"

        # If there's dividend data, add that to the dividend DataFrame
        if 'dividend' in df.columns:
            # Slice off the days with dividends
            tmp = df[df['dividend'] != 0.0]['dividend']
            div = pd.DataFrame(data=tmp.index.tolist(), columns=['ex_date'])

            # Provide empty columns as we don't have this data for now
            div['record_date'] = pd.NaT
            div['declared_date'] = pd.NaT
            div['pay_date'] = pd.NaT

            # Store the dividends and set the Security ID
            div['amount'] = tmp.tolist()
            div['sid'] = sid

            # Start numbering at where we left off last time
            ind = pd.Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
            div.set_index(ind, inplace=True)

            # Append this stock's dividends to the list of all dividends
            divs = divs.append(div)

        yield sid, df