Consumers

  • Consumers read data from topic (identified by name).

  • It pulls data from Kafka topic, so follows pull model unlike RabbitMQ which follows push model.

  • Consumers knows from which broker to read from.

  • In case of failure, consumers knows how to recover.

  • Data is read in order from low to high offset within each partitions.

Consumer Message Deserializer

  • Deserialize will help convert bytes send by Kafka to Objects required by consumer to process.

  • They are used on keys and values of a message.

  • Consumer must know in advance data type that are being send to deserialize it correctly.

  • Note that deserialization type must not change during the lifecycle of a topic. If need to be changed then create a new topic.

Message Deserializer

Consumer Groups

  • Consumers in application read data as a consumer group.

  • Each consumer within a group reads from exclusive partitions.

  • If there are too many consumers in consumer group, then some consumers will stay inactive.

Consumer Group
  • Also multiple consumer group can read from same topic.

Multiple Consumer Group Same Topic

Consumer Offsets → Delivery Semantics

  • When consumer belonging to a consumer group is reading data from topic, Kafka stores their offset (i.e committed) in a Kafka topic named __consumer_offsets (internal topic).

  • Consumers periodically should commit the offsets once data has been processed. The broker rather than group will write to the __consumer_offsets.

  • The commit helps Kafka to understand how far have we read from the Kafta topic partition successfully.

    • If a consumer dies, it will be able to read back the data from where it left off based on the committed consumer offsets.

  • There are 3 delivery semantics if you choose to commit the offset manually,

    1. Atleast once

      • By default, Java consumers will automatically commit offset after the message is processed.

      • If processing fails, message can be read again.

      • This can lead to duplicate processing of messages, hence make sure message processing is idempotent.

    2. Atmost once

      • Offsets are committed as soon as messages are received.

      • If the processing goes wrong, some messages will be lost, wont be read again.

    3. Exactly once

      • If need to be processed only once, useful for Kafka -> Kafka workflows

      • Can be achieved using transaction API (Kafka Stream API).

      • Kafka -> External system workflows, use idempotent consumer.

Resetting Offsets

  • Reset cannot be done when consumer is running, consumer must be stopped.

Offset Commit

  • In Java API offsets are committed regularly.

  • Enable at-least once reading scenario by default, under certain conditions.

    • Offsets are committed asynchronously when consumer calls poll() and auto.commit.interval.ms in msec has elapsed with enable.auto.commit = true.

    • Because of this, ensure to call poll() from consumer only when a message has been processed.

  • But it is possible to disable autocommit by setting enable.auto.commit = false.

    • Most likely processing from a separate thread and time-to-time call commitSync() or commitAutoSync() with correct offsets manually.

  • So, basically there are 3 ways to commit,

    • Autocommit, based on enable.auto.commit=true parameter.

    • Else explicit commit based on, enable.auto.commit=false & manual commit of offsets.

    • Store offsets externally like in a database, and use seek() to reach the offset and assign() API to assign partition.

      • If rebalance happens one should use ConsumerRebalanceListener interface to handle processing.

Consumer Groups - ⚖️ Partition Rebalance

  • When a consumer joins or leaves a consumer group, Kafka rebalances the partition based on strategy configured.

  • It moves partition amongst available consumers.

  • It also happens when a new partition is created by administrator.

  • It is done based on consumer property: partition.assignment.strategy

  • There are 2 strategy types,

    • Eager Rebalance (default)

      • All consumers stop and give up their partitions membership.

      • They rejoin the consumer group and get a new partition assignment.

      • It leads to stop the world event.

      • The partitions are assigned randomly.

      • Strategies are as follows,

      Strategy Name
      Description

      RangeAssignor

      Assigns partition on a per-topic basis

      RoundRobin

      Assigns partition across all topic basis in roundrobin fashion

      StickyAssignor

      Balances like roundrobin, but then minimises partition movement when consumer join/leave.

    • Incremental Rebalance

      • Only a small subset of partitions are reassigned from one consumer to another.

      • Other consumers that do not have reassigned partitions can still be processed.

      • Can go through several iterations to find a stable assignment.

      • Avoids stop the world problem, that leads to consumers stop processing.

      Strategy Name
      Description

      CooperativeStickyAssignor

      Rebalances like StickyAssignor but supports cooperative rebalances

  • In Kafka 3.x the default is [RangeAssignor, CooperativeStickyAssignor] where by default it uses RangeAssignor but on removal from list will rebalance based on CooperativeStickyAssignor by doing a single rolling bounce.

  • Kafka Connect and Kafka Streams uses them by default.

Static Group Membership

  • By default, when a consumer leaves a group, its partition are revoked and reassigned.

  • If it joins back it will have new member id and new partition assigned.

  • If you specify group.instance.id it makes the consumer a static member.

    • But this is only possible provided consumer joins back within session timeout limit configured by parameter session.timeout.ms.

    • Only then will consumer get the partition back without triggering a rebalance.

Offset Reset Behavior

  • If consumer remains down for longer than Kafka's retention period (by default 7 days).

  • So offsets may become invalid.

  • auto.offset.reset=latest will read from the end of the log.

  • auto.offset.reset=earliest will read from the beginning of the log.

  • auto.offset.reset=none will throw exception if no offset is found.

  • In older version of Kafka < v2.0, data is lost if consumer haven't read in 1 day by default.

  • In Kafka >= v2.0, data is lost if consumer haven't read in 7 days by default.

  • This can be controlled by broker setting offset.retention.minutes.

  • To replay data for a consumer group,

    • Take all the consumers from a specific consumer-group down.

    • Use kafka-consumer-groups command to set offset what you want.

    • Restart consumer.

Consumer Working

  • Consumers in a consumer group talk to Kafka via Consumer Group Coordinator (acting broker).

  • To detect a consumer is alive there are two mechanism,

    • Heart beat mechanism

      • This Consumer Group Coordinator sends heartbeat periodically to consumers in a consumer group.

      • Consumer Heartbeat Thread, sends data to Kafka periodically based on settings heartbeat.interval.ms, by default 3 ms.

      • Usually this value is 1/3rd of session.timeout.ms value.

      • In Kafka v3.0+ session.timeout.ms by deault is 45s so heartbeat are to be sent periodically to the broker by 10s.

      • If no heartbeat is sent during that period, consumer is considered dead.

      • Set the value to even lower for quicker consumer rebalances.

    • Poll mechanism

      • Poll thread uses brokers to poll periodically from them.

      • By default max.poll.interval.ms is 5mins.

        • This setting allow maximum amount of time between 2 poll() calls before declaring the consumer dead.

        • Relevant for Big Data applications.

      • This mechanism is used to detect if consumer is stuck when processing data.

      • To avoid issues, consumers are expected to poll faster and process data quickly.

      • max.poll.records denotes how many records to poll at a time, by default 500.

        • Increase if messages are small and have appropriate RAM allocated.

      • fetch.min.bytes how much min amount data need to be pulled on each request. Default is 1.

        • If lowered can improve throughput, but at cost of latency.

      • fetch.max.wait.ms denotes max amount of time Kafka broker will block before replying the fetch request.

        • It there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes then consumer will have to wait the configured wait time as per this parameter. By default it is 500ms.

      • max.partition.fetch.bytes by default 1MB. This is the maximum amount of data per partition the server will return.

      • fetch.max.bytes, max amount of data to be returned for each fetch request. By default 55MB.

      Consumer Liveliness

Consumer Behavior → Partition Leaders

  • By default, consumer reads from partition leaders.

  • It has possibly higher latency and network charges if there are multiple data centers, such that consumer being in different datacenter and Kafka broker being in another.

Replica Fetching

  • Since Kafka v2.4+, it is possible to read from the replicas, i.e so closest replica.

  • This could improve latency, also decrease network costs if leveraging cloud.

Consumer Replica Fetching
  • For this we need to configure consumer rack awareness.

    • Broker settings

      • Broker must be atleast v2.4+.

      • rack.id must be set to the data center. Example: rack.id=usw2-az1

      • replica.selector.class must be set to org.apacha.kafka.common.replica.RackAwareReplicaSelector.

    • Consumer client settings

      • Set client.rack to the data center ID the consumer is launched on.

Last updated