def effective_n(trace_dict):
def get_vhat(x):
# number of chains is last dim (-1)
# chain samples are second to last dim (-2)
num_samples = x.shape[-2]
# Calculate between-chain variance
B = num_samples * np.var(np.mean(x, axis=-2), axis=-1, ddof=1)
# Calculate within-chain variance
W = np.mean(np.var(x, axis=-2, ddof=1), axis=-1)
# Estimate of marginal posterior variance
Vhat = W * (num_samples - 1) / num_samples + B / num_samples
return Vhat
def get_neff(x, Vhat):
num_chains = x.shape[-1]
num_samples = x.shape[-2]
negative_autocorr = False
t = 1
rho = np.ones(num_samples)
# Iterate until the sum of consecutive estimates of autocorrelation is
# negative
while not negative_autocorr and (t < num_samples):
variogram = np.mean((x[t:, :] - x[:-t, :])**2)
rho[t] = 1. - variogram / (2. * Vhat)
if not t % 2:
negative_autocorr = sum(rho[t - 1:t + 1]) < 0
t += 1
return min(num_chains * num_samples,
int(num_chains * num_samples / (1. + 2 * rho[1:t].sum())))
n_eff = {}
for var in trace_dict:
x = trace_dict[var]
# make sure to handle scalars correctly - add extra dim if needed
if len(x.shape) == 2:
is_scalar = True
else:
is_scalar = False
# now we are going to transpose all dims - makes the loop below
# easier by moving the axes of the variable to the front instead
# of the chain and sample axes
x = x.transpose()
Vhat = get_vhat(x)
# get an array the same shape as the var
_n_eff = np.zeros(x.shape[:-2])
# iterate over tuples of indices of the shape of var
for tup in np.ndindex(*list(x.shape[:-2])):
_n_eff[tup] = get_neff(x[tup], Vhat[tup])
# we could be using np.squeeze here, but we don't want to squeeze
# out dummy dimensions that a user inputs
if is_scalar:
n_eff[var] = _n_eff[0]
else:
# make sure to transpose the dims back
n_eff[var] = np.transpose(_n_eff)
return n_eff