Kafka
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import org.waves_rsp.fwk.Configuration;
import org.waves_rsp.kafka.rdf.protobuf.ProtobufKafkaCodec;
import static org.waves_rsp.fwk.util.StringUtils.*;
public class KafkaConfig
{
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/** The default local Kafka host: <code>localhost:9092</code> */
public final static String KAFKA_DEFAULT_HOST = "localhost:9092";
/**
* Configuration property to disable Protocol Buffer serialization
* of Waves RDF events in Kafka topics. When Protobuf is disabled,
* standard Java serialization applies.
*/
public final static String KAFKA_DISABLE_PROTOBUF_PROP =
"kafka.disable.protobuf";
/** Kafka bolt configuration property for Kafka broker configuration. */
public final static String KAFKA_BROKER_PROPERTIES =
"kafka.broker.properties";
public final static String BOOTSTRAP_SERVERS = BOOTSTRAP_SERVERS_CONFIG;
// ------------------------------------------------------------------------
// Constructors
// ------------------------------------------------------------------------
/** Default constructor, private on purpose. */
private KafkaConfig() {
throw new UnsupportedOperationException();
}
// ------------------------------------------------------------------------
// Static utility methods
// ------------------------------------------------------------------------
/**
* Returns the
* {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG Kafka host}
* from the provided properties or, if not set, the
* {@link #KAFKA_DEFAULT_HOST default host}.
* @param props the runtime configuration.
* @return the Kafka host.
*/
public static String getKafkaHosts(Properties props) {
String hosts = props.getProperty(BOOTSTRAP_SERVERS_CONFIG);
return (isSet(hosts))? hosts: KAFKA_DEFAULT_HOST;
}
/**
* Creates a Kafka client configuration from the specified
* properties. The properties are not copied but used as default
* values.
* @param conf the runtime configuration.
* @return A properties object suitable for configuring a Kafka
* light producer or consumer.
*/
public static Properties getDefaultConfig(Properties conf) {
// Message Kafka Codec.
String codec = getMessageKafkaCodecClass().getName();
Properties props = new Properties(conf);
// Kafka heavy (Scala-based) client configuration properties.
props.setProperty("metadata.broker.list", getKafkaHosts(conf));
props.setProperty("key.serializer.class",
"kafka.serializer.StringEncoder");
props.setProperty("key.deserializer.class",
"kafka.serializer.StringDecoder");
props.setProperty("serializer.class", codec);
props.setProperty("deserializer.class", codec);
// Kafka light (Java native) client configuration properties.
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, getKafkaHosts(conf));
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, codec);
props.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, codec);
return props;
}
/**
* Returns the serialization/deserialization class to use for
* exchanging Waves RDF events.
* @return the serializer class for Kafka message value.
*/
public static Class<?> getMessageKafkaCodecClass() {
return (Configuration.getBoolean(KAFKA_DISABLE_PROTOBUF_PROP))?
JavaSerializationKafkaCodec.class:
ProtobufKafkaCodec.class;
}
}
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.log4j.Logger;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import com.google.common.collect.Lists;
import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.common.TopicExistsException;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
import net.atos.fr.michelin.utilities.PropertyLoader;
import scala.collection.JavaConverters;
import scala.collection.Seq;
public class KafkaSetUp {
private static Logger logger = Logger.getLogger(KafkaSetUp.class);
private Properties configLoad = PropertyLoader.loadProperties("config", null);
private String topicName;
private ZkClient zkClient;
private ZkUtils zkUtil;
private int retryTimes;
private int retryInterval;
public KafkaSetUp(Properties configLoad, ZkHosts hosts, String topicName) {
this.configLoad = configLoad;
this.topicName = topicName;
zkClient = new ZkClient(hosts.brokerZkStr, Integer.parseInt(configLoad.getProperty("KAFKA_ZOOKEEPER_SESSION_TIMEOUT")),
Integer.parseInt(configLoad.getProperty("KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT")),
new ZkSerializer() {
@Override
public byte[] serialize(Object data) {
return ZKStringSerializer.serialize(data);
}
@Override
public Object deserialize(byte[] bytes) {
return ZKStringSerializer.deserialize(bytes);
}
});
retryTimes = Integer.parseInt(configLoad.getProperty("READ_BROKERS_RETRY_TIMES"));
retryInterval = Integer.parseInt(configLoad.getProperty("READ_BROKERS_RETRY_INTERVAL"));
}
private int getPartitionsNb() {
Map<TopicAndPartition, Seq<Object>> existingPartitionsReplicaList =
JavaConverters.mapAsJavaMapConverter(
ZkUtils.getReplicaAssignmentForTopics(zkClient,
JavaConverters.asScalaBufferConverter(Lists.newArrayList(topicName)).asScala()
.toSeq())).asJava();
return existingPartitionsReplicaList.size();
}
public void createTopic(int numPartitions) {
if (!AdminUtils.topicExists(zkUtil, topicName)) {
try {
Integer replFactor = Integer.parseInt(configLoad.getProperty("REPLICATION_FACTOR"));
AdminUtils.createTopic(zkUtil, topicName, numPartitions, replFactor, new Properties(), null);
logger.info("Topic created. name: " +topicName+ " partitions: " + numPartitions+ "replFactor: " +replFactor);
} catch (TopicExistsException ignore) {
logger.info("Topic yexists. name: " + topicName);
}
} else {
logger.info("Topic exists. name: " + topicName);
if (numPartitions > getPartitionsNb()) {
try {
AdminUtils.addPartitions(zkUtil, topicName, numPartitions, "", true, null);
logger.info("Topic altered. name: " + topicName + " partitions: " + numPartitions);
} catch (AdminOperationException e) {
logger.error("Failed to add partitions", e);
}
}
}
}
private int getLeader(int partition) {
try {
for (int i = 0; i < retryTimes; i++) {
scala.Option<Object> option = zkUtil.getLeaderForPartition(topicName, partition);
if (!option.isEmpty()) {
return ((Number) option.get()).intValue();
}
TimeUnit.MILLISECONDS.sleep(retryInterval);
}
} catch (InterruptedException ignore) {
ignore = null;
}
logger.error("Failed to get leader for partition. topic: " + topicName + " partition: "+ partition);
return -1;
}
private org.apache.storm.kafka.Broker getBrokerHost(int brokerId) {
try {
for (int i = 0; i < retryTimes; i++) {
scala.Option<Broker> option = zkUtil.getBrokerInfo(brokerId);
if (!option.isEmpty()) {
kafka.cluster.Broker broker = option.get();
return new org.apache.storm.kafka.Broker(broker.host,broker.port);
}
TimeUnit.MILLISECONDS.sleep(retryInterval);
}
} catch (InterruptedException ignore) {
ignore = null;
}
logger.error("Failed to get broker info. brokerId: " + brokerId);
return null;
}
public GlobalPartitionInformation getCurrentBrokers() {
GlobalPartitionInformation brokersInfo = new GlobalPartitionInformation(topicName);
try {
int numPartitions = getPartitionsNb();
for (int partition = 0; partition < numPartitions; partition++) {
int leader = getLeader(partition);
if (leader >= 0) {
Broker broker = getBrokerHost(leader);
//if (broker != null) {
//brokersInfo.addPartition(partition, broker);;
//}
}
}
} catch (Exception e) {
logger.error("Failed to get current brokers", e);
}
return brokersInfo;
}
public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}