lennart-l of LBCT Snippets
11/10/2017 - 2:00 AM

Slow Consumer

Runs on Center node only

import com.navis.framework.esb.server.ESBServerHelper
import com.navis.framework.esb.server.configuration.ESBActiveMQConfigGenerator

import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.ConnectionViewMBean;

import com.navis.framework.business.Roastery;
import com.navis.framework.esb.server.configuration.ESBBrokerFactoryBean;

/**
 * This groovy script is to be executed on the center node JVM.  You run the script only when you have either the bridge queue or any a4 queues
 * that stops being processed by a 'frozen' node.   This script terminates the AMQ proxy consumers coming from unresponsive n4 cluster node and
 * re-route messages to other live nodes.
 *
 * WARNING: You need to understand this script before you run it!
 *
 * LIMITATION: This script works in the case where the 'frozen' node has JVM or VM problems.  This script will not solve the problem if the 'frozen'
 *           node stuck because of the database deadlock.  In that case, re-routing messages to other nodes will end up stuck in DB transaction again.
 *
 *
 *  Usage:
 *    Set the following two variables before use.
 *     1) QUEUE_NAME - Set the queue name having the queueing problem, such as "bridge.LBCT/LBCT/LBCT/LBCT", "a4.agv_commands"
 *     2) RUN_TO_KILL - Set false to see and check which n4 node whose consumer threads are in "stuck" state.  This gives you a chance to investigate
 *                      the bad node.  Set it to true to actually kill the consumers.
 *
 *  Algorithm:
 *    Assume the first message in the queue being held "hostage" by the bad node.
 *    1) Get the group ID from the first message in the queue specified by QUEUE_NAME
 *    2) Get the hostname that has been assigned with the group ID
 *    3) Find all consumers (both good and bad, and both n4 and a4) coming from that hostname.
 *    4) When RUN_TO_KILL=false, print out those consumers.
 *       When RUN_TO_KILL=true, print out those consumers and kill them.  Message group assignment also get refreshed.
 *
 *  Impact:
 *    The consumers on the bad n4 node may or may not continue to process the messages they already have.  If they continued and committed JMS
 *    transaction, they would encounter and log error about "post failover recovery. 1 previously delivered message(s) not replayed to consumer".
 *    Those same messages are re-delivered to other live nodes.  Therefore, you can end up duplicate messages being processed twice.  Those killed
 *    consumers will be recreated by the n4 node and, assuming the node becomes healthy again, will participate to compete for getting new messages.
 */
class FindAndKillConsumersMatchingTheFirstGroupID {

    final String QUEUE_NAME = "";  // Example: "bridge.LBCT/LBCT/LBCT/LBCT" or "a4.agv_commands";
    final boolean RUN_TO_KILL = false;   // when you're ready, set this to true and run this script again to actually kill the consumers.

    public String execute() {
        BrokerService broker = (BrokerService) Roastery.getBean(ESBBrokerFactoryBean.BEAN_ID);
        try {
            String brokerName = ESBServerHelper.getESBBrokerName();
            // Get queueMbean
            ObjectName queueName = new ObjectName
                    ("org.apache.activemq:type=Broker,brokerName=" + brokerName +
                            ",destinationType=Queue,destinationName=" + QUEUE_NAME);
            QueueViewMBean queueMbean = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueName, QueueViewMBean.class, true);
            // Get Client Connectors
            ObjectName connectionNamesON = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + brokerName +
                    ",connector=clientConnectors,connectorName=openwire,connectionViewType=clientId,connectionName=*");
            Set connectionNames = broker.getManagementContext().queryNames(connectionNamesON, null);

            if (queueMbean.getInFlightCount() > 0) {
                CompositeData[] queueData = queueMbean.browse();
                Map statistics = new HashMap();
                if (queueData.length > 0) {
                    StringBuffer result = new StringBuffer();
                    // Can we assume the first one is already in trouble?
                    String groupId = queueData[0].get("JMSXGroupID");
                    result.append("First group ID : " + groupId + "\n");
                    String hostName = queueMbean.getMessageGroups().get(groupId);
                    hostName = hostName.substring(3);
                    result.append("Found hostname : " + hostName + "\n\n");
                    hostName = hostName.substring(0, hostName.lastIndexOf("-"));
                    hostName = hostName.substring(0, hostName.lastIndexOf("-"));
                    hostName = hostName.substring(0, hostName.lastIndexOf("-"));
                    // match all client connection and stop them.
                    for(ObjectName name : connectionNames)  {
                        ConnectionViewMBean connectionViewMbean = (ConnectionViewMBean) broker.getManagementContext().newProxyInstance(name, ConnectionViewMBean.class, true);
                        String clientId = connectionViewMbean.getClientId();
                        if (clientId.startsWith("JmsPollingTask_a4_" + hostName) ||
                                clientId.startsWith("SyncPollingJob_n4_" + hostName) ||
                                clientId.startsWith("ForwardingJmsPollingTask_a4_" + hostName)) {
                            if (RUN_TO_KILL) {
                                result.append("Kill:  " + clientId + "\n");
                                connectionViewMbean.stop();
                            } else {
                                result.append("Found:  " + clientId + "\n");
                            }
                        }
                    }
                    // reassign groups
                    if (RUN_TO_KILL) {
                        //queueMbean.removeAllMessageGroups();
                    }
                    return result.toString();

                } else {
                    return "This bridge queue contains no messages";
                }
            } else {
                return "The inFlightCount is 0.  Nothing appears to be stuck.";
            }
        } catch (Exception e) {
            return e.toString() + "  Check the scope.";
        }
        return "Did nothing.";
    }
}