Intro to Kafka
While creating:
kafka-topics --create --topic test_cleanup --zookeeper 127.0.0.1:2181 --config cleanup.policy=compact --partitions 3 --replication-factor 1
Updating:
kafka-topics --alter --topic test_cleanup --zookeeper 127.0.0.1:2181 --config cleanup.policy=delete
__consumer_offsets
)More partitions:
Partition Guidelines:
Should be at least 2, max of 3.
The higher replication factor:
More replicas, more disk use
Guidelines for replication factor:
In production, deleting a topic not a common approach. Create topic with different name instead.
You can write to a topic that does not exist. Kafka creates topic for you, but only with partition 1. You need to create topic manually for parallelism.
At most once: Offsets are committed an soon as the message is received. If the processing goes wrong, the message will be lost (it won't be read again). ex. Consumer gets 1,2,3 and its commited. If processing fails, other consumers get from 4. not 1.
At least once: Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. Make sure your processing is idempotent (i.e. processing again messages won't impact the systems) ex. Consumer gets 1,2,3 and while processing it, another consumer can get 1,2 and 3 again. (duplicate processing)
Exactly once: Very difficult to achieve / needs strong engineering.
Bottom line: Most often you should use at least once processing and ensure your transformations are idempotent
With docker:
Visit: https://github.com/Landoop/fast-data-dev
Run docker container:
docker run --rm --net=host landoop/fast-data-dev
Visit localhost:3030
Enter container for Kafka CLI tools:
docker run --rm -it --net=host landoop/fast-data-dev bash
if docker container is not available
docker exec -it containerId //bin//bash
if container exists
Enter container for Kafka CLI tools:
docker run --rm -it --net=host landoop/fast-data-dev bash
if docker container is not available
docker exec -it containerId //bin//bash
if container exists
Create topic called first_topic
:
kafka-topics --zookeeper 127.0.0.1:2181 --create --topic first_topic --partitions 3 --replication-factor 1
Warning: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Replication factor cannot be larger than broker size.
Check created topic by UI, or list topics from cli:
kafka-topics --zookeeper 127.0.0.1:2181 --list
Delete topic by (not recommended) (Should config be set):
kafka-topics --zookeeper 127.0.0.1:2181 --topic second_topic --delete
Topic info:
kafka-topics --zookeeper 127.0.0.1:2181 --topic second_topic --describe
Count message in topic:
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9094..... --topic product-raw-data
Send stdin to topic:
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
If there is no such topic, Warning is printed and topic is created automatically.
Reading from tail:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
Get from beginning:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning
Get from specific partition:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --partition 0
Assign group id while fetching (So that kafka saves offset for later use):
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --consumer-property group.id=mygroup1
Show consumer groups:
kafka-consumer-groups --list --bootstrap-server 127.0.0.1:9092
Get message by offset:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic products-processed --offset 21541 --partition 0