Consumers
Consumers read data from topic (identified by name).
It pulls data from Kafka topic, so follows pull model unlike RabbitMQ which follows push model.
Consumers knows from which broker to read from.
In case of failure, consumers knows how to recover.
Data is read in order from low to high offset within each partitions.
Consumer Message Deserializer
Deserialize will help convert bytes send by Kafka to Objects required by consumer to process.
They are used on keys and values of a message.
Consumer must know in advance data type that are being send to deserialize it correctly.
Note that deserialization type must not change during the lifecycle of a topic. If need to be changed then create a new topic.

Consumer Groups
Consumers in application read data as a consumer group.
Each consumer within a group reads from exclusive partitions.
If there are too many consumers in consumer group, then some consumers will stay inactive.

Also multiple consumer group can read from same topic.

Consumer Offsets → Delivery Semantics
When consumer belonging to a consumer group is reading data from topic, Kafka stores their offset (i.e committed) in a Kafka topic named
__consumer_offsets(internal topic).Consumers periodically should commit the offsets once data has been processed. The broker rather than group will write to the
__consumer_offsets.The commit helps Kafka to understand how far have we read from the Kafta topic partition successfully.
If a consumer dies, it will be able to read back the data from where it left off based on the committed consumer offsets.
There are 3 delivery semantics if you choose to commit the offset manually,
Atleast once
By default, Java consumers will automatically commit offset after the message is processed.
If processing fails, message can be read again.
This can lead to duplicate processing of messages, hence make sure message processing is idempotent.
Atmost once
Offsets are committed as soon as messages are received.
If the processing goes wrong, some messages will be lost, wont be read again.
Exactly once
If need to be processed only once, useful for Kafka -> Kafka workflows
Can be achieved using transaction API (Kafka Stream API).
Kafka -> External system workflows, use idempotent consumer.
Resetting Offsets
Reset cannot be done when consumer is running, consumer must be stopped.
Offset Commit
In Java API offsets are committed regularly.
Enable at-least once reading scenario by default, under certain conditions.
Offsets are committed asynchronously when consumer calls
poll()andauto.commit.interval.msin msec has elapsed withenable.auto.commit = true.Because of this, ensure to call
poll()from consumer only when a message has been processed.
But it is possible to disable autocommit by setting
enable.auto.commit = false.Most likely processing from a separate thread and time-to-time call
commitSync()orcommitAutoSync()with correct offsets manually.
So, basically there are 3 ways to commit,
Autocommit, based on
enable.auto.commit=trueparameter.Else explicit commit based on,
enable.auto.commit=false& manual commit of offsets.Store offsets externally like in a database, and use
seek()to reach the offset andassign()API to assign partition.If rebalance happens one should use
ConsumerRebalanceListenerinterface to handle processing.
Consumer Groups - ⚖️ Partition Rebalance
When a consumer joins or leaves a consumer group, Kafka rebalances the partition based on strategy configured.
It moves partition amongst available consumers.
It also happens when a new partition is created by administrator.
It is done based on consumer property:
partition.assignment.strategyThere are 2 strategy types,
Eager Rebalance (default)
All consumers stop and give up their partitions membership.
They rejoin the consumer group and get a new partition assignment.
It leads to stop the world event.
The partitions are assigned randomly.
Strategies are as follows,
Strategy NameDescriptionRangeAssignorAssigns partition on a per-topic basis
RoundRobinAssigns partition across all topic basis in roundrobin fashion
StickyAssignorBalances like roundrobin, but then minimises partition movement when consumer join/leave.
Incremental Rebalance
Only a small subset of partitions are reassigned from one consumer to another.
Other consumers that do not have reassigned partitions can still be processed.
Can go through several iterations to find a stable assignment.
Avoids stop the world problem, that leads to consumers stop processing.
Strategy NameDescriptionCooperativeStickyAssignorRebalances like StickyAssignor but supports cooperative rebalances
In Kafka
3.xthe default is [RangeAssignor,CooperativeStickyAssignor] where by default it usesRangeAssignorbut on removal from list will rebalance based onCooperativeStickyAssignorby doing a single rolling bounce.Kafka Connect and Kafka Streams uses them by default.
Static Group Membership
By default, when a consumer leaves a group, its partition are revoked and reassigned.
If it joins back it will have new member id and new partition assigned.
If you specify
group.instance.idit makes the consumer a static member.But this is only possible provided consumer joins back within session timeout limit configured by parameter
session.timeout.ms.Only then will consumer get the partition back without triggering a rebalance.
Offset Reset Behavior
If consumer remains down for longer than Kafka's retention period (by default 7 days).
So offsets may become invalid.
auto.offset.reset=latestwill read from the end of the log.auto.offset.reset=earliestwill read from the beginning of the log.auto.offset.reset=nonewill throw exception if no offset is found.In older version of Kafka <
v2.0, data is lost if consumer haven't read in 1 day by default.In Kafka >=
v2.0, data is lost if consumer haven't read in 7 days by default.This can be controlled by broker setting
offset.retention.minutes.To replay data for a consumer group,
Take all the consumers from a specific consumer-group down.
Use
kafka-consumer-groupscommand to set offset what you want.Restart consumer.
Consumer Working
Consumers in a consumer group talk to Kafka via Consumer Group Coordinator (acting broker).
To detect a consumer is alive there are two mechanism,
Heart beat mechanism
This Consumer Group Coordinator sends heartbeat periodically to consumers in a consumer group.
Consumer Heartbeat Thread, sends data to Kafka periodically based on settingsheartbeat.interval.ms, by default 3 ms.Usually this value is 1/3rd of
session.timeout.msvalue.In Kafka
v3.0+session.timeout.msby deault is45sso heartbeat are to be sent periodically to the broker by10s.If no heartbeat is sent during that period, consumer is considered dead.
Set the value to even lower for quicker consumer rebalances.
Poll mechanism
Poll thread uses brokers to poll periodically from them.
By default
max.poll.interval.msis5mins.This setting allow maximum amount of time between 2
poll()calls before declaring the consumer dead.Relevant for Big Data applications.
This mechanism is used to detect if consumer is stuck when processing data.
To avoid issues, consumers are expected to poll faster and process data quickly.
max.poll.recordsdenotes how many records to poll at a time, by default500.Increase if messages are small and have appropriate RAM allocated.
fetch.min.byteshow much min amount data need to be pulled on each request. Default is 1.If lowered can improve throughput, but at cost of latency.
fetch.max.wait.msdenotes max amount of time Kafka broker will block before replying the fetch request.It there isn't sufficient data to immediately satisfy the requirement given by
fetch.min.bytesthen consumer will have to wait the configured wait time as per this parameter. By default it is500ms.
max.partition.fetch.bytesby default 1MB. This is the maximum amount of data per partition the server will return.fetch.max.bytes, max amount of data to be returned for each fetch request. By default55MB.

Consumer Liveliness
Consumer Behavior → Partition Leaders
By default, consumer reads from partition leaders.
It has possibly higher latency and network charges if there are multiple data centers, such that consumer being in different datacenter and Kafka broker being in another.
Replica Fetching
Since Kafka
v2.4+, it is possible to read from the replicas, i.e so closest replica.This could improve latency, also decrease network costs if leveraging cloud.

For this we need to configure consumer rack awareness.
Broker settings
Broker must be atleast
v2.4+.rack.idmust be set to the data center. Example:rack.id=usw2-az1replica.selector.class must be set to
org.apacha.kafka.common.replica.RackAwareReplicaSelector.
Consumer client settings
Set
client.rackto the data center ID the consumer is launched on.
Last updated