aroemen
11/30/2011 - 9:21 PM

CircuitBreakerModule.java

/**
 * Mule Development Kit
 * Copyright 2010-2011 (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * This file was automatically generated by the Mule Development Kit
 */
package com.acmesoft;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.api.MuleContext;
import org.mule.api.annotations.Module;
import org.mule.api.annotations.Configurable;
import org.mule.api.annotations.Processor;
import org.mule.api.annotations.param.Payload;
import org.mule.api.config.MuleProperties;
import org.mule.api.context.MuleContextAware;
import org.mule.api.registry.RegistrationException;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreManager;
import org.mule.message.ExceptionMessage;

import javax.inject.Inject;
import java.util.Date;
import java.util.concurrent.Semaphore;

/**
 * An implementation of Michael Nygard's CircuitBreaker pattern.
 *
 * @author MuleSoft, Inc.
 */
@Module(name = "circuitbreaker", schemaVersion = "1.0-SNAPSHOT")
public class CircuitBreakerModule implements MuleContextAware {

    protected transient Log logger = LogFactory.getLog(getClass());

    /**
     * The amount of failures until the circuit breaker is tripped.
     */
    @Configurable
    private int tripThreshold;

    /**
     * How long to wait until the breaker is automatically reset.
     */
    @Configurable
    private long tripTimeout;

    /**
     * The name of this breaker.
     */
    @Configurable
    private String breakerName;

    private Date breakerTrippedOn;

    private Semaphore objectStoreMutex = new Semaphore(1);

    MuleContext muleContext;

    @Inject
    private ObjectStoreManager objectStoreManager;


    public void setTripThreshold(int tripThreshold) {
        this.tripThreshold = tripThreshold;
    }

    public void setTripTimeout(long tripTimeout) {
        this.tripTimeout = tripTimeout;
    }

    public void setBreakerName(String breakerName) {
        this.breakerName = breakerName;
    }

    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
    }

    /**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:filter}
     *
     * @param payload The message payload
     * @return Some string
     */
    @Processor
    public Object filter(@Payload Object payload) {
        if (getFailureCount() < tripThreshold) {
            return payload;
        } else if (breakerTrippedOn != null && System.currentTimeMillis() - breakerTrippedOn.getTime() > tripTimeout) {
            breakerTrippedOn = null;
            resetFailureCount();
            return payload;
        } else {
            throw new CircuitOpenException();
        }
    }

    /**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:trip}
     *
     * @param exceptionMessage The exception.
     * @param tripOnException  The exception type we should trip on.
     * @return Some string
     */
    @Processor
    public Object trip(String tripOnException, @Payload ExceptionMessage exceptionMessage) {

        if (exceptionMessage.getException().getCause().getClass().getCanonicalName().equals(tripOnException)) {
            incrementFailureCount();
            if (getFailureCount() == tripThreshold) {
                breakerTrippedOn = new Date();
            }
        }
        return exceptionMessage;
    }


    Integer getFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);

        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
            }
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();

        return failureCount;

    }

    void incrementFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, failureCount + 1);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }

    void resetFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, 0);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }


}