7/25/2017 - 6:54 AM

Intro to Kafka

Intro to Kafka

Creating or Updating Policy

While creating:

kafka-topics --create --topic test_cleanup --zookeeper --config cleanup.policy=compact --partitions 3 --replication-factor 1


kafka-topics --alter --topic test_cleanup --zookeeper --config cleanup.policy=delete

log.cleanup.policy=delete (Kafka default for all user topics)
  • Delete based on age of data (default is a week)
  • Delete based on max size of log (default -1 == infinite)
log.cleanup.policy=compact (Kafka default for topic __consumer_offsets)
  • Delete based on keys of your messages
  • Will delete old duplicate keys after the active segment is committed
  • Infinite time and space retention.

More partitions:

  • Better parallelism, better thruput,
  • BUT more files opened on your system,
  • BUT if a broker fails, lots of concurrent leader elections
  • BUT added latency to replicate (in order of milliseconds)

Partition Guidelines:

  • Keep # of partition per brokers between 2000 and 4000
  • Keep # of partitions in the cluster to less than 20000
  • Partitions per topic (1 to 2) * (# of brokers), max 10 partitions
  • Ex. 3 brokers setup, 3 or 6 partitions is a good number to start with. Roughly, each partition can get a thruput of 10MB/sec
Replication Factor

Should be at least 2, max of 3.

The higher replication factor:

  • Better resilience of the system (N-1 brokers can fail)
  • BUT longer replication, (higher latency is acks=all)
  • BUT more disk space

More replicas, more disk use

Guidelines for replication factor:

  • Set it to 3 (must have 3 brokers for that) if having 6 brokers, set it 3 again.
  • If replication performance is an issue, get a better broker instead of less RF

In production, deleting a topic not a common approach. Create topic with different name instead.

You can write to a topic that does not exist. Kafka creates topic for you, but only with partition 1. You need to create topic manually for parallelism.

Delivery Semantics for Consumers

At most once: Offsets are committed an soon as the message is received. If the processing goes wrong, the message will be lost (it won't be read again). ex. Consumer gets 1,2,3 and its commited. If processing fails, other consumers get from 4. not 1.

At least once: Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. Make sure your processing is idempotent (i.e. processing again messages won't impact the systems) ex. Consumer gets 1,2,3 and while processing it, another consumer can get 1,2 and 3 again. (duplicate processing)

Exactly once: Very difficult to achieve / needs strong engineering.

Bottom line: Most often you should use at least once processing and ensure your transformations are idempotent

With docker:


Run docker container: docker run --rm --net=host landoop/fast-data-dev

Visit localhost:3030

Enter container for Kafka CLI tools: docker run --rm -it --net=host landoop/fast-data-dev bash if docker container is not available docker exec -it containerId //bin//bash if container exists

Enter container for Kafka CLI tools:

docker run --rm -it --net=host landoop/fast-data-dev bash if docker container is not available docker exec -it containerId //bin//bash if container exists


Create topic called first_topic:

kafka-topics --zookeeper --create --topic first_topic --partitions 3 --replication-factor 1

Warning: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Replication factor cannot be larger than broker size.

Check created topic by UI, or list topics from cli:

kafka-topics --zookeeper --list

Delete topic by (not recommended) (Should config be set):

kafka-topics --zookeeper --topic second_topic --delete

Topic info:

kafka-topics --zookeeper --topic second_topic --describe

Count message in topic: kafka-run-class --broker-list localhost:9092,localhost:9094..... --topic product-raw-data


Send stdin to topic:

kafka-console-producer --broker-list --topic first_topic

If there is no such topic, Warning is printed and topic is created automatically.


Reading from tail:

kafka-console-consumer --bootstrap-server --topic first_topic

Get from beginning:

kafka-console-consumer --bootstrap-server --topic first_topic --from-beginning

Get from specific partition:

kafka-console-consumer --bootstrap-server --topic first_topic --from-beginning --partition 0

Assign group id while fetching (So that kafka saves offset for later use):

kafka-console-consumer --bootstrap-server --topic first_topic --from-beginning --consumer-property

Show consumer groups:

kafka-consumer-groups --list --bootstrap-server

Get message by offset:

kafka-console-consumer --bootstrap-server --topic products-processed --offset 21541 --partition 0