skynyrd
7/25/2017 - 6:54 AM

Intro to Kafka

Intro to Kafka

Creating or Updating Policy

While creating:

kafka-topics --create --topic test_cleanup --zookeeper 127.0.0.1:2181 --config cleanup.policy=compact --partitions 3 --replication-factor 1

Updating:

kafka-topics --alter --topic test_cleanup --zookeeper 127.0.0.1:2181 --config cleanup.policy=delete

log.cleanup.policy=delete (Kafka default for all user topics)
  • Delete based on age of data (default is a week)
  • Delete based on max size of log (default -1 == infinite)
log.cleanup.policy=compact (Kafka default for topic __consumer_offsets)
  • Delete based on keys of your messages
  • Will delete old duplicate keys after the active segment is committed
  • Infinite time and space retention.

More partitions:

  • Better parallelism, better thruput,
  • BUT more files opened on your system,
  • BUT if a broker fails, lots of concurrent leader elections
  • BUT added latency to replicate (in order of milliseconds)

Partition Guidelines:

  • Keep # of partition per brokers between 2000 and 4000
  • Keep # of partitions in the cluster to less than 20000
  • Partitions per topic (1 to 2) * (# of brokers), max 10 partitions
  • Ex. 3 brokers setup, 3 or 6 partitions is a good number to start with. Roughly, each partition can get a thruput of 10MB/sec
Replication Factor

Should be at least 2, max of 3.

The higher replication factor:

  • Better resilience of the system (N-1 brokers can fail)
  • BUT longer replication, (higher latency is acks=all)
  • BUT more disk space

More replicas, more disk use

Guidelines for replication factor:

  • Set it to 3 (must have 3 brokers for that) if having 6 brokers, set it 3 again.
  • If replication performance is an issue, get a better broker instead of less RF

In production, deleting a topic not a common approach. Create topic with different name instead.

You can write to a topic that does not exist. Kafka creates topic for you, but only with partition 1. You need to create topic manually for parallelism.

Delivery Semantics for Consumers

At most once: Offsets are committed an soon as the message is received. If the processing goes wrong, the message will be lost (it won't be read again). ex. Consumer gets 1,2,3 and its commited. If processing fails, other consumers get from 4. not 1.

At least once: Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. Make sure your processing is idempotent (i.e. processing again messages won't impact the systems) ex. Consumer gets 1,2,3 and while processing it, another consumer can get 1,2 and 3 again. (duplicate processing)

Exactly once: Very difficult to achieve / needs strong engineering.

Bottom line: Most often you should use at least once processing and ensure your transformations are idempotent

With docker:

Visit: https://github.com/Landoop/fast-data-dev

Run docker container: docker run --rm --net=host landoop/fast-data-dev

Visit localhost:3030

Enter container for Kafka CLI tools: docker run --rm -it --net=host landoop/fast-data-dev bash if docker container is not available docker exec -it containerId //bin//bash if container exists

Enter container for Kafka CLI tools:

docker run --rm -it --net=host landoop/fast-data-dev bash if docker container is not available docker exec -it containerId //bin//bash if container exists

kafka-topics

Create topic called first_topic:

kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1

Warning: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Replication factor cannot be larger than broker size.

Check created topic by UI, or list topics from cli:

kafka-topics --zookeeper 127.0.0.1:2181 --list

Delete topic by (not recommended) (Should config be set):

kafka-topics --zookeeper 127.0.0.1:2181 --topic second_topic --delete

Topic info:

kafka-topics --zookeeper 127.0.0.1:2181 --topic second_topic --describe

Count message in topic: kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9094..... --topic product-raw-data

kafka-console-producer

Send stdin to topic:

kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic

If there is no such topic, Warning is printed and topic is created automatically.

kafka-console-consumer

Reading from tail:

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic

Get from beginning:

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

Get from specific partition:

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --partition 0

Assign group id while fetching (So that kafka saves offset for later use):

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --consumer-property group.id=mygroup1

Show consumer groups:

kafka-consumer-groups --list --bootstrap-server 127.0.0.1:9092

Get message by offset:

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic products-processed --offset 21541 --partition 0