LiamBao
6/11/2018 - 7:06 AM

main demo

main demo

package com.d.d;


import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.InputStream;
import java.net.URL;
import java.util.Properties;


public class mainDemo {

    /**
     *  DoctorKafka is the central service for managing kafka operation.
     *
     */
        private static final Logger LOG = LogManager.getLogger(DoctorKafkaMain.class);
        private static final String CONFIG_PATH = "config";
        private static final String METRICS_TOPIC = "topic";
        private static final String OSTRICH_PORT = "ostrichport";
        private static final String TSD_HOSTPORT = "tsdhostport";
        private static final String UPTIME_IN_SECONDS = "uptimeinseconds";
        private static final String ZOOKEEPER = "zookeeper";
        private static final Options options = new Options();

        public static  DoctorKafka doctorKafka = null;
        private static DoctorKafkaWatcher operatorWatcher = null;

        /**
         *  Usage:  DoctorKafkaMain  --config config_file_path
         */
        private static CommandLine parseCommandLine(String[] args) {
            Option configPath = new Option(CONFIG_PATH, true, "config file path");
            Option zookeeper = new Option(ZOOKEEPER, true, "zk url for metrics topic");
            zookeeper.setRequired(false);
            Option topic = new Option(METRICS_TOPIC, true, "kafka topic for metric messages");
            topic.setRequired(false);
            Option tsdHostPort = new Option(TSD_HOSTPORT, true, "tsd host and port, e.g. localhost:18621");
            tsdHostPort.setRequired(false);
            Option ostrichPort = new Option(OSTRICH_PORT, true, "ostrich port");
            ostrichPort.setRequired(false);
            Option uptimeInSeconds = new Option(UPTIME_IN_SECONDS, true, "uptime in seconds");
            uptimeInSeconds.setRequired(false);
            options.addOption(configPath).addOption(zookeeper).addOption(topic)
                    .addOption(tsdHostPort).addOption(ostrichPort).addOption(uptimeInSeconds);

            if (args.length < 1) {
                printUsageAndExit();
            }
            CommandLineParser parser = new DefaultParser();
            CommandLine cmd = null;
            try {
                cmd = parser.parse(options, args);
            } catch (ParseException | NumberFormatException e) {
                printUsageAndExit();
            }
            return cmd;
        }

        private static void printUsageAndExit() {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("DoctorKafka", options);
            System.exit(1);
        }


        public static void main(String[] args) throws Exception {
            Runtime.getRuntime().addShutdownHook(new DoctorKafkaMain.OperatorCleanupThread());
            CommandLine commandLine = parseCommandLine(args);

            String configPath = commandLine.getOptionValue(CONFIG_PATH);
            LOG.info("configuration path : {}", configPath);

            ReplicaStatsManager.config =  new DoctorKafkaConfig(configPath);
            doctorKafka = new DoctorKafka(ReplicaStatsManager.config);
            doctorKafka.start();

            // start the web UI
            int webPort = ReplicaStatsManager.config.getWebserverPort();
            DoctorKafkaWebServer webServer = new DoctorKafkaWebServer(webPort);
            webServer.start();

            int ostrichPort = ReplicaStatsManager.config.getOstrichPort();
            String tsdHostPort = ReplicaStatsManager.config.getTsdHostPort();
            OperatorUtil.startOstrichService(tsdHostPort, ostrichPort);
            LOG.info("DoctorKafka started.");
        }

        static class OperatorCleanupThread extends Thread {

            @Override
            public void run() {
                try {
                    if (doctorKafka != null) {
                        doctorKafka.stop();
                    }
                } catch (Throwable t) {
                    LOG.error("Failure in stopping operator", t);
                }

                try {
                    if (operatorWatcher != null) {
                        operatorWatcher.stop();
                    }
                } catch (Throwable t) {
                    LOG.error("Shutdown failure in collectorMonitor : ", t);
                }
            }
        }
}