fonnesbeck
5/10/2012 - 9:47 PM

Simple self-tuning random walk Metropolis algorithm

Simple self-tuning random walk Metropolis algorithm

"""

A random walk Metropolis algorithm with tuning, implemented for a linear model that
predicts price as a function of age.

"""

import numpy as np
from scipy.stats import distributions
rnorm = np.random.normal
runif = np.random.rand
dgamma = distributions.gamma.logpdf
dnorm = distributions.norm.logpdf

# The data
age = np.array([13, 14, 14,12, 9, 15, 10, 14, 9, 14, 13, 12, 9, 10, 15, 11, 15, 11, 7, 13, 13, 10, 9, 6, 11, 15, 13, 10, 9, 9, 15, 14, 14, 10, 14, 11, 13, 14, 10])
price = np.array([2950, 2300, 3900, 2800, 5000, 2999, 3950, 2995, 4500, 2800, 1990, 3500, 5100, 3900, 2900, 4950, 2000, 3400, 8999, 4000, 2950, 3250, 3950, 4600, 4500, 1600, 3900, 4200, 6500, 3500, 2999, 2600, 3250, 2500, 2400, 3990, 4600, 450,4700])/1000.

n_params = 3


def calc_posterior(a, b, t, y=price, x=age):
    # Calculate joint posterior, given values for a, b and t

    # Priors on a,b
    logp = dnorm(a, 0, 10000) + dnorm(b, 0, 10000)
    # Prior on t
    logp += dgamma(t, 0.001, 0.001)
    # Calculate mu
    mu = a + b*x
    # Data likelihood
    logp += sum(dnorm(y, mu, t**-2))
    
    return logp


def sample(iterations=10000, tune_interval=100):
        
    # Initial proposal standard deviations
    prop_sd = [5, 5, 5]
    
    # Initialize trace for parameters
    trace = np.empty((iterations+1, n_params))

    # Set initial values
    trace[0] = 0,0,1
    # Initialize acceptance counts
    accepted = [0]*n_params

    # Calculate joint posterior for initial values
    current_log_prob = calc_posterior(*trace[0])
    
    for i in range(iterations):
    
        if not i%1000: print 'Iteration', i
    
        # Grab current parameter values
        current_params = trace[i]
    
        for j in range(n_params):
    
            # Get current value for parameter j
            p = trace[i].copy()
    
            # Propose new value
            if j==2:
                # Ensure tau is positive
                theta = np.exp(rnorm(np.log(current_params[j]), prop_sd[j]))
            else:
                theta = rnorm(current_params[j], prop_sd[j])
            
            # Insert new value 
            p[j] = theta
    
            # Calculate log posterior with proposed value
            proposed_log_prob = calc_posterior(*p)
    
            # Log-acceptance rate
            alpha = proposed_log_prob - current_log_prob
    
            # Sample a uniform random variate
            u = runif()
    
            # Test proposed value
            if np.log(u) < alpha:
                # Accept
                trace[i+1,j] = theta
                current_log_prob = proposed_log_prob
            else:
                # Reject
                trace[i+1,j] = trace[i,j]
    
            # Tune every <tune_interval> iterations
            if not (i+1) % tune_interval:
    
                # Calculate aceptance rate
                acceptance_rate = accepted[j]/tune_interval
    
                if acceptance_rate<0.2:
                    prop_sd[j] *= 0.9
                elif acceptance_rate>0.5:
                    prop_sd[j] *= 1.1
                else:
                    print 'Parameter %i is tuned!' % j
    
                accepted[j] = 0

    return trace

if __name__=='__main__':

    t = sample()
    print np.mean(t, 0)