Pinball score for probabilistic forecasting
"""
Calculate PINBALL LOSS SCORE (Quantile Score) for data into Pandas DF
Input: df(q1,q2,...,qn,real) for diferent time steps (index)
Output: input df with pinball score column included
"""
## Calculate average Pinball Loss score for any df of data
def calculate_pinball_score(DF):
import math
import numpy as np
## Definition of Pinball Loss function
def fpinball(real,predict,quantile):
# validation
try:
real = float(real); predict = float(predict); quantile = float(quantile)
except:
print("ERROR: input data has a strange format - real: %s predict: %s quantile: %s"%(type(real),type(predict),type(quantile)))
return None
# calculate and return
if real>=predict: return (real-predict)*quantile
elif real<predict: return (predict-real)*(1-quantile)
# validation of nan values
for index,ivalue in list(zip(DF.isnull().sum().index.values,DF.isnull().sum().values)):
if ivalue>0: print("WARNING: There any NaN values in '%s': %s (they will be deleted..)"%(index,ivalue))
# delete nan data
DF = DF.dropna()
# initialize
lpinball_row = list()
# loop of temporal steps
for i in range(len(DF.index.values)):
""" COLLECT DATA """
# initialize
drow = dict()
# loop of columns (quantiles and real value)
for index,ivalue in list(zip(DF.iloc[i].index.values,DF.iloc[i].values)):
# names
try:
name = int(''.join([isn for isn in index if isn.isdigit()]))
except:
name = 'real'
# store
drow[name] = ivalue
# list of name of quantiles
lq = list(drow.keys())
lq.remove('real')
lq = sorted(lq, reverse=False)
""" CALCULATE SCORE """
# initialize
lscore = list()
# loop of quantiles in order to calculate the score por each quantile
for iq in lq:
# normalize quantile
if iq>1: quantile = float(iq/100.)
else: quantile = float(iq)
# validation of input data
if math.isnan(drow['real']) or math.isnan(drow[iq]):
pass
else:
# calculate and store
lscore.append(fpinball(drow['real'],drow[iq],quantile))
# average of quantile score for each step
try:
pinball_row = np.mean(lscore)
except:
print("WARNING: there are any problem to calculate the average of row %s: %s"%(i,lscore))
# store pinball score
lpinball_row.append(pinball_row)
# display by screen the total average
print("Pinball Loss Score: %s"%np.mean(lpinball_row))
# store into df
DF["pinball"] = lpinball_row
# return
return DF