Runs on Center node only
import com.navis.framework.esb.server.ESBServerHelper
import com.navis.framework.esb.server.configuration.ESBActiveMQConfigGenerator
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.ConnectionViewMBean;
import com.navis.framework.business.Roastery;
import com.navis.framework.esb.server.configuration.ESBBrokerFactoryBean;
/**
* This groovy script is to be executed on the center node JVM. You run the script only when you have either the bridge queue or any a4 queues
* that stops being processed by a 'frozen' node. This script terminates the AMQ proxy consumers coming from unresponsive n4 cluster node and
* re-route messages to other live nodes.
*
* WARNING: You need to understand this script before you run it!
*
* LIMITATION: This script works in the case where the 'frozen' node has JVM or VM problems. This script will not solve the problem if the 'frozen'
* node stuck because of the database deadlock. In that case, re-routing messages to other nodes will end up stuck in DB transaction again.
*
*
* Usage:
* Set the following two variables before use.
* 1) QUEUE_NAME - Set the queue name having the queueing problem, such as "bridge.LBCT/LBCT/LBCT/LBCT", "a4.agv_commands"
* 2) RUN_TO_KILL - Set false to see and check which n4 node whose consumer threads are in "stuck" state. This gives you a chance to investigate
* the bad node. Set it to true to actually kill the consumers.
*
* Algorithm:
* Assume the first message in the queue being held "hostage" by the bad node.
* 1) Get the group ID from the first message in the queue specified by QUEUE_NAME
* 2) Get the hostname that has been assigned with the group ID
* 3) Find all consumers (both good and bad, and both n4 and a4) coming from that hostname.
* 4) When RUN_TO_KILL=false, print out those consumers.
* When RUN_TO_KILL=true, print out those consumers and kill them. Message group assignment also get refreshed.
*
* Impact:
* The consumers on the bad n4 node may or may not continue to process the messages they already have. If they continued and committed JMS
* transaction, they would encounter and log error about "post failover recovery. 1 previously delivered message(s) not replayed to consumer".
* Those same messages are re-delivered to other live nodes. Therefore, you can end up duplicate messages being processed twice. Those killed
* consumers will be recreated by the n4 node and, assuming the node becomes healthy again, will participate to compete for getting new messages.
*/
class FindAndKillConsumersMatchingTheFirstGroupID {
final String QUEUE_NAME = ""; // Example: "bridge.LBCT/LBCT/LBCT/LBCT" or "a4.agv_commands";
final boolean RUN_TO_KILL = false; // when you're ready, set this to true and run this script again to actually kill the consumers.
public String execute() {
BrokerService broker = (BrokerService) Roastery.getBean(ESBBrokerFactoryBean.BEAN_ID);
try {
String brokerName = ESBServerHelper.getESBBrokerName();
// Get queueMbean
ObjectName queueName = new ObjectName
("org.apache.activemq:type=Broker,brokerName=" + brokerName +
",destinationType=Queue,destinationName=" + QUEUE_NAME);
QueueViewMBean queueMbean = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueName, QueueViewMBean.class, true);
// Get Client Connectors
ObjectName connectionNamesON = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + brokerName +
",connector=clientConnectors,connectorName=openwire,connectionViewType=clientId,connectionName=*");
Set connectionNames = broker.getManagementContext().queryNames(connectionNamesON, null);
if (queueMbean.getInFlightCount() > 0) {
CompositeData[] queueData = queueMbean.browse();
Map statistics = new HashMap();
if (queueData.length > 0) {
StringBuffer result = new StringBuffer();
// Can we assume the first one is already in trouble?
String groupId = queueData[0].get("JMSXGroupID");
result.append("First group ID : " + groupId + "\n");
String hostName = queueMbean.getMessageGroups().get(groupId);
hostName = hostName.substring(3);
result.append("Found hostname : " + hostName + "\n\n");
hostName = hostName.substring(0, hostName.lastIndexOf("-"));
hostName = hostName.substring(0, hostName.lastIndexOf("-"));
hostName = hostName.substring(0, hostName.lastIndexOf("-"));
// match all client connection and stop them.
for(ObjectName name : connectionNames) {
ConnectionViewMBean connectionViewMbean = (ConnectionViewMBean) broker.getManagementContext().newProxyInstance(name, ConnectionViewMBean.class, true);
String clientId = connectionViewMbean.getClientId();
if (clientId.startsWith("JmsPollingTask_a4_" + hostName) ||
clientId.startsWith("SyncPollingJob_n4_" + hostName) ||
clientId.startsWith("ForwardingJmsPollingTask_a4_" + hostName)) {
if (RUN_TO_KILL) {
result.append("Kill: " + clientId + "\n");
connectionViewMbean.stop();
} else {
result.append("Found: " + clientId + "\n");
}
}
}
// reassign groups
if (RUN_TO_KILL) {
//queueMbean.removeAllMessageGroups();
}
return result.toString();
} else {
return "This bridge queue contains no messages";
}
} else {
return "The inFlightCount is 0. Nothing appears to be stuck.";
}
} catch (Exception e) {
return e.toString() + " Check the scope.";
}
return "Did nothing.";
}
}