badrebelabbess
6/17/2016 - 3:34 PM

Kafka

Kafka

import java.util.Properties;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import org.waves_rsp.fwk.Configuration;
import org.waves_rsp.kafka.rdf.protobuf.ProtobufKafkaCodec;

import static org.waves_rsp.fwk.util.StringUtils.*;

public class KafkaConfig
{
    // ------------------------------------------------------------------------
    // Constants
    // ------------------------------------------------------------------------

    /** The default local Kafka host: <code>localhost:9092</code> */
    public final static String KAFKA_DEFAULT_HOST = "localhost:9092";
    /**
     * Configuration property to disable Protocol Buffer serialization
     * of Waves RDF events in Kafka topics. When Protobuf is disabled,
     * standard Java serialization applies.
     */
    public final static String KAFKA_DISABLE_PROTOBUF_PROP =
                                                    "kafka.disable.protobuf";
    /** Kafka bolt configuration property for Kafka broker configuration. */
    public final static String KAFKA_BROKER_PROPERTIES =
                                                    "kafka.broker.properties";
    public final static String BOOTSTRAP_SERVERS = BOOTSTRAP_SERVERS_CONFIG;

    // ------------------------------------------------------------------------
    // Constructors
    // ------------------------------------------------------------------------

    /** Default constructor, private on purpose. */
    private KafkaConfig() {
        throw new UnsupportedOperationException();
    }

    // ------------------------------------------------------------------------
    // Static utility methods
    // ------------------------------------------------------------------------

    /**
     * Returns the
     * {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG Kafka host}
     * from the provided properties or, if not set, the
     * {@link #KAFKA_DEFAULT_HOST default host}.
     * @param  props   the runtime configuration.
     * @return the Kafka host.
     */
    public static String getKafkaHosts(Properties props) {
        String hosts = props.getProperty(BOOTSTRAP_SERVERS_CONFIG);
        return (isSet(hosts))? hosts: KAFKA_DEFAULT_HOST;
    }

    /**
     * Creates a Kafka client configuration from the specified
     * properties. The properties are not copied but used as default
     * values.
     * @param  conf   the runtime configuration.
     * @return A properties object suitable for configuring a Kafka
     *         light producer or consumer.
     */
    public static Properties getDefaultConfig(Properties conf) {
        // Message Kafka Codec.
        String codec = getMessageKafkaCodecClass().getName();

        Properties props = new Properties(conf);
        // Kafka heavy (Scala-based) client configuration properties.
        props.setProperty("metadata.broker.list", getKafkaHosts(conf));
        props.setProperty("key.serializer.class",
                          "kafka.serializer.StringEncoder");
        props.setProperty("key.deserializer.class",
                          "kafka.serializer.StringDecoder");
        props.setProperty("serializer.class", codec);
        props.setProperty("deserializer.class", codec);
        // Kafka light (Java native) client configuration properties.
        props.setProperty(BOOTSTRAP_SERVERS_CONFIG, getKafkaHosts(conf));
        props.setProperty(KEY_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class.getName());
        props.setProperty(KEY_DESERIALIZER_CLASS_CONFIG,
                          StringDeserializer.class.getName());
        props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, codec);
        props.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, codec);
        return props;
    }
 
    /**
     * Returns the serialization/deserialization class to use for
     * exchanging Waves RDF events.
     * @return the serializer class for Kafka message value.
     */
    public static Class<?> getMessageKafkaCodecClass() {
        return (Configuration.getBoolean(KAFKA_DISABLE_PROTOBUF_PROP))?
                                          JavaSerializationKafkaCodec.class:
                                          ProtobufKafkaCodec.class;
    }
}
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.log4j.Logger;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;

import com.google.common.collect.Lists;

import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.common.TopicExistsException;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
import net.atos.fr.michelin.utilities.PropertyLoader;
import scala.collection.JavaConverters;
import scala.collection.Seq; 
 
public class KafkaSetUp  { 
 

	private static Logger logger = Logger.getLogger(KafkaSetUp.class);
	private  Properties configLoad = PropertyLoader.loadProperties("config", null);
	private String topicName; 
	private ZkClient zkClient; 
	private ZkUtils zkUtil;
	private int retryTimes; 
	private int retryInterval; 
 
  public KafkaSetUp(Properties configLoad, ZkHosts hosts, String topicName) { 
    this.configLoad = configLoad; 
    this.topicName = topicName; 
    zkClient = new ZkClient(hosts.brokerZkStr, Integer.parseInt(configLoad.getProperty("KAFKA_ZOOKEEPER_SESSION_TIMEOUT")), 
    		Integer.parseInt(configLoad.getProperty("KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT")), 
        new ZkSerializer() { 
          @Override 
          public byte[] serialize(Object data) { 
            return ZKStringSerializer.serialize(data); 
          } 
 
          @Override 
          public Object deserialize(byte[] bytes) { 
            return ZKStringSerializer.deserialize(bytes); 
          } 
        }); 
 
    retryTimes = Integer.parseInt(configLoad.getProperty("READ_BROKERS_RETRY_TIMES")); 
    retryInterval = Integer.parseInt(configLoad.getProperty("READ_BROKERS_RETRY_INTERVAL")); 
  } 
 
  private int getPartitionsNb() { 
    Map<TopicAndPartition, Seq<Object>> existingPartitionsReplicaList = 
        JavaConverters.mapAsJavaMapConverter( 
            ZkUtils.getReplicaAssignmentForTopics(zkClient, 
                JavaConverters.asScalaBufferConverter(Lists.newArrayList(topicName)).asScala() 
                .toSeq())).asJava();
    return existingPartitionsReplicaList.size();
  } 
 
  public void createTopic(int numPartitions) { 
    if (!AdminUtils.topicExists(zkUtil, topicName)) {
      try { 
        Integer replFactor = Integer.parseInt(configLoad.getProperty("REPLICATION_FACTOR")); 
        AdminUtils.createTopic(zkUtil, topicName, numPartitions, replFactor, new Properties(), null); 
        logger.info("Topic created. name: " +topicName+ " partitions: " + numPartitions+ "replFactor: " +replFactor); 
      } catch (TopicExistsException ignore) { 
        logger.info("Topic yexists. name: " + topicName); 
      } 
    } else { 
      logger.info("Topic exists. name: " + topicName); 
      if (numPartitions > getPartitionsNb()) { 
        try { 
          AdminUtils.addPartitions(zkUtil, topicName, numPartitions, "", true, null);
          logger.info("Topic altered. name: " + topicName + " partitions: " + numPartitions); 
        } catch (AdminOperationException e) { 
          logger.error("Failed to add partitions", e); 
        } 
      } 
    } 
  } 
 
  private int getLeader(int partition) { 
    try { 
      for (int i = 0; i < retryTimes; i++) { 
        scala.Option<Object> option = zkUtil.getLeaderForPartition(topicName, partition);
        if (!option.isEmpty()) { 
          return ((Number) option.get()).intValue(); 
        } 
        TimeUnit.MILLISECONDS.sleep(retryInterval); 
      } 
    } catch (InterruptedException ignore) { 
      ignore = null; 
    } 
    logger.error("Failed to get leader for partition. topic: " + topicName + " partition: "+ partition); 
    return -1; 
  } 
 
  private org.apache.storm.kafka.Broker getBrokerHost(int brokerId) { 
    try { 
      for (int i = 0; i < retryTimes; i++) { 
        scala.Option<Broker> option = zkUtil.getBrokerInfo(brokerId); 
        if (!option.isEmpty()) { 
          kafka.cluster.Broker broker = option.get(); 
          return new org.apache.storm.kafka.Broker(broker.host,broker.port); 
        } 
        TimeUnit.MILLISECONDS.sleep(retryInterval); 
      } 
    } catch (InterruptedException ignore) { 
      ignore = null; 
    } 
    logger.error("Failed to get broker info. brokerId: " + brokerId); 
    return null; 
  } 
 
  public GlobalPartitionInformation getCurrentBrokers() { 
    GlobalPartitionInformation brokersInfo = new GlobalPartitionInformation(topicName); 
    try { 
      int numPartitions = getPartitionsNb(); 
      for (int partition = 0; partition < numPartitions; partition++) { 
        int leader = getLeader(partition); 
        if (leader >= 0) { 
        	Broker broker = getBrokerHost(leader);
          //if (broker != null) { 
            //brokersInfo.addPartition(partition, broker);; 
          //} 
        } 
      } 
    } catch (Exception e) { 
      logger.error("Failed to get current brokers", e); 
    } 
 
    return brokersInfo; 
  } 
 
  public void close() { 
    if (zkClient != null) { 
      zkClient.close(); 
    } 
  } 
}