Arima Model for TimeSeries forecasting
"""
ARIMA MODEL FITTING
Input:
- y: Pandas Dataframe Series
- seasonal_frequency: supposed seasonal frequency
- strend: str{‘n’,’c’,’t’,’ct’} or iterable, optional
Parameter controlling the deterministic trend polynomial A(t). Can be specified as a string where ‘c’ indicates a constant (i.e. a degree zero component of the trend polynomial), ‘t’ indicates a linear trend with time, and ‘ct’ is both. Can also be specified as an iterable defining the polynomial as in numpy.poly1d, where [1,1,0,1] would denote a + bt + ct^3. Default is to not include a trend component.
- isdisplay: boolean to display or not extra information
- test_range: maximum value of p,d,q ARIMA parameters to be tested into the optimization process
Output: ARIMA model
"""
def arima_fit(y,seasonal_frequency,strend,isdisplay,test_range=1):
import warnings
import itertools
import statsmodels.api as sm
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
warnings.filterwarnings("ignore") # specify to ignore warning messages
import pandas as pd
import numpy as np
""" Parameter Selection for the ARIMA Time Series Model """
try:
# Define the p, d and q parameters to take any value between 0 and 2
p = d = q = range(0, test_range+1)
# Generate all different combinations of p, q and q triplets
pdq = list(itertools.product(p, d, q))
# Generate all different combinations of seasonal p, q and q triplets
seasonal_pdq = [(x[0], x[1], x[2], seasonal_frequency) for x in list(itertools.product(p, d, q))]
if isdisplay:
print('Examples of parameter combinations for Seasonal ARIMA...')
print('SARIMAX: {} x {}'.format(pdq[1], seasonal_pdq[1]))
print('SARIMAX: {} x {}'.format(pdq[1], seasonal_pdq[2]))
print('SARIMAX: {} x {}'.format(pdq[2], seasonal_pdq[3]))
print('SARIMAX: {} x {}'.format(pdq[2], seasonal_pdq[4]))
print('\n')
# initialize
lresults = list()
## check parameter combinations
if isdisplay: print('Optimization of parameter combinations:')
for param in pdq:
for param_seasonal in seasonal_pdq:
try:
mod = sm.tsa.statespace.SARIMAX(y,
order=param,
seasonal_order=param_seasonal,
enforce_stationarity=False,
enforce_invertibility=False)
results = mod.fit()
# display
if isdisplay: print('ARIMA{}x{}{} - AIC:{}'.format(param, seasonal_frequency, param_seasonal, results.aic))
# append
lresults.append([param, param_seasonal, results.aic])
except Exception as e:
print(str(e))
continue
## OPTIMICE
# store in df
RESULTS = pd.DataFrame(np.array(lresults))
RESULTS.columns = ['param','param_seasonal','aic']
# collect aic data
aic = RESULTS.aic.values
# calcule the most optimal parameters
param_opt,param_seasonal_opt,aic_opt = RESULTS[RESULTS.aic==np.min(RESULTS.aic.values)][['param','param_seasonal','aic']].values[0]
if isdisplay: print('\nThe Most Optimal ARIMA %sx%s - AIC:%.3f \n'%(param_opt,param_seasonal_opt,aic_opt))
# clean
del(RESULTS)
except Exception as e:
print('ERROR: getting the most optimal ARIMA parameters')
print(str(e))
return None
""" FIT MODEL """
try:
# build the model
mod = sm.tsa.statespace.SARIMAX(y,
trend = strend,
order=param_opt,
seasonal_order=param_seasonal_opt,
enforce_stationarity=False,
enforce_invertibility=False)
# fit the model
results = mod.fit()
# display results
if isdisplay:print(results.summary())
except:
print('ERROR: fitting ARIMA model')
return None
""" DIAGNOSIS """
if isdisplay:
try:
import matplotlib.pyplot as plt
results.plot_diagnostics(figsize=(15, 12))
plt.show()
except:
print('ERROR: diagnosis of fitting ARIMA model')
return None
# return
return results
"""
ARIMA PREDICTION
Input:
- y: Pandas Dataframe Series
- model: 'sm.tsa.statespace.SARIMAX' object with the fitted model
- istart: pd.to_datetime('2012-01-01') object or integer index of 'y' Dataframe Series when start the prediction period
- iend: pd.to_datetime('2012-01-01') object or integer index of 'y' Dataframe Series when end the prediction period
- isdynamic: boolean
- isdisplay: boolean
"""
def arima_prediction(y,model,istart,iend,isdynamic,isdisplay):
import pandas as pd
import numpy as np
""" PREDICTION """
try:
# prediction
pred = model.get_prediction(start=istart,end=iend, dynamic=isdynamic, full_results=True)
# confidence range
pred_ci = pred.conf_int()
## STORE RESULTS
# store obs
OBS = pd.DataFrame({'date':y.index, 'real':y.values})
OBS = OBS.set_index(['date'])
# store prediction
PRED = pd.DataFrame({'date':pred.predicted_mean.index, 'forecast':pred.predicted_mean.values})
PRED = PRED.set_index(['date'])
# concat
RESULT = pd.concat([OBS,PRED],axis=1)
# calculate number of predicted steps
if isinstance(istart, pd.tslib.Timestamp): npre = len(RESULT[(RESULT.index>=istart) & (RESULT.index<=iend)])
else: npre = iend - istart
except Exception as e:
print('ERROR: ARIMA prediction')
print(str(e))
return None
""" VALIDATION """
## SCORE
# clean results
VALIDA = RESULT.dropna()
# Compute the mean square error
if len(VALIDA)>0:
mse = ((VALIDA.forecast.values - VALIDA.real.values) ** 2).mean()
rmse = np.sqrt(mse)
print('ARIMA SCORES: mse = %.3f rmse = %.3f'%(mse,rmse))
else:
rmse = '--'
if isdisplay:
## PLOT RESULTS
try:
import matplotlib.pyplot as plt
# build object
ax = y.plot(label='observed', figsize=(20, 10))
pred.predicted_mean.plot(label='forecast', ax=ax)
# probabilistic forecast range
ax.fill_between(pred_ci.index,
pred_ci.iloc[:, 0],
pred_ci.iloc[:, 1], color='k', alpha=.25)
# test range
ax.fill_betweenx(ax.get_ylim(), istart, iend,alpha=.1, zorder=-1)
# set labels
ax.set_xlabel('Date')
ax.set_ylabel('Variable')
# set legend
plt.legend(loc='best')
# set title
plt.title('ARIMA FORECAST (rmse = %s)'%rmse,fontsize=28)
# display
plt.show()
except:
pass
## PLOT COMPARE RESULTS
try:
import matplotlib.pyplot as plt
# set previous steps
npredict = npre
# build object
fig, ax = plt.subplots(figsize=(12,6))
# plot
ax.set(title='ARIMA FORECAST (rmse = %s)'%rmse, xlabel='Date', ylabel='Variable')
ax.plot(RESULT.index[-npredict-npre+1:], RESULT.ix[-npredict-npre+1:, 'real'], 'o', label='Observed')
ax.plot(RESULT.index[-npredict-npre+1:], RESULT.ix[-npredict-npre+1:, 'forecast'], 'g', label='Forecast')
# set legend
legend = ax.legend(loc='best')
legend.get_frame().set_facecolor('w')
# display
plot.show()
except:
pass
# return
if len(VALIDA)>0: return VALIDA
else: return RESULT
""" BODY """
if __name__ == "__main__":
## GET DATA
#y = Pandas DataFrame Series
## ANALYSIS
# launch
timeserie_analysis(y)
## FIT MODEL
# input parameters
strend = 'n'
seasonal_frequency = 12
test_range = 1
isdisplay = True
# launch
model = arima_fit(y,seasonal_frequency,strend,isdisplay,test_range=1)
## PREDICTION
# input parameters
isdynamic = True
istart = pd.to_datetime('2011-01-01') #len(y) - 12
iend = pd.to_datetime('2011-12-01') #len(y)
isdisplay = True
# launch
RESULT = arima_prediction(y,model,istart,iend,isdynamic,isdisplay)