Producers

  • Kafka producers write data to Kafka Topics (i.e partition).

  • To write a data to Kafka topic and eventually to a partition, Producer should know well in advance which partition to write to along with Kafka broker details.

  • In case of broker failures, producers know how to recover automatically.

  • Mesages send to partition are load balanced at brokers, because of number of partitions and hence Kafka scales easily.

  • Producers can send a key in message (optional) of type string, number, object.

    • If key is null, message is assigned to partition based on round robin fashion (load balancing).

    • If key is not null, same key which gets hashed, will lead the message to land in same partition always.

    • Specifying a key ensures message ordering for a particular partition.

Kafka Message Serializer

  • Kafka accepts messages from producer and sends messages to consumers in bytes.

  • Message Serializer helps achieve this transformation to bytes.

  • Serialization applies only to value and key.

  • Common Serializer includes String including Json, Int, Float, Avro, Protobuf etc.

  • Note that serialization type must not change during the lifecycle of a topic, else consumers will break.

    • If need to be changed then create a new topic.

    • Reprogram your consumer.

Typical Serializer

Kafka Message Key Hashing

  • When a message is send, the partitioner logic at producer side takes a record and determines which partition to send to.

  • Key Hashing is the process of determining the mapping of key to the partition.

  • Default Key paritioner in kafka uses murmur2 algorithm with formuala as below,

targetparitition=Math.abs(Utils.murmur2(keyBytes))%(numPartitions1) targetparitition = Math.abs(Utils.murmur2(keyBytes)) \%(numPartitions-1)

Producer Acknowledgements and Topic Availability

  • Producer can choose to receive acknowledgement of data writes.

    • Ack = 0 : Producer won't wait for acknowledgement (possible data loss).

    • Ack = 1 : Producer will wait for acknowledgement from leader partition. Default setting from v1.0 to v2.8. If ack is not received, producer retries.

    • Ack = ALL (-1) : Leader + replicas acknowledgement (no data loss). All in Sync replica, default setting in Kafka v3.0+.

      • min.insync.replicas

        • Value denotes how many replicas have to successfully ack to say the Ack is successful.

        • min.insync.replicas=1 means only leader has to ack.

        • min.insync.replicas=2 means leader and one broker nexeds to ack and so on. This is recommended and populat option in case of acknowledge of all.

        • If enough replicas are not present then the produces returns with a failure.

        • This setting is a broker side setting.

Broker Failure Calculation

  • If there are N brokers with N replication factor and M minimum in-sync replica value then upto N-M broker failure can be tolerated for writes.

Retries

  • By default, when not enough brokers are available for writes, producer application is expected to handle retries.

  • Retry value is 0 for version <= 2.0, and 2147483647 (infinitely) for version >= 2.1.

  • Since Kafka 2.1, can set delivery.timeout.ms=120000 ms by default.

    • If retry > 0 then retries are bounded by a timeout as configured by above property.

  • Records will be failed if the message is NOT acknowledged within the delivery timeout.

  • retry.backoff.ms setting is by default 100 ms.

  • Because of retries messages may be duplicated and will be sent out of order.

  • Key based ordering in older version of Kafka may lead to problems. Producer TimeOut

Parallel Request

  • At a time a producer can sent multiple request based on setting max.inflight.requests.per.connection value.

  • Default value is 5.

  • If using older version of Kafka and are using key-based ordering then this may impact throughput.

  • To fix this set the property value to 1.

  • Above Kafka v1.0.0, there is idempotent producers.

Idempotent Producer

  • Due to network jitters and errors, producer may introduce duplicate messages in Kafka.

  • This errors may lead to producer never seeing acknowlegment send by broker after committing the message/record.

  • In Kafka >= v0.11 one can define Idempotent producer.

  • This will ensure, duplicate messages are not introduced in the broker.

  • A must to ensure safe and stable pipeline.

  • They are default producers since Kafka v3.0.

  • When producer is idempotent, retries are set as Integer.MAX_VALUE.

  • max.inflight.requests.per.connection=5 for >= v1.0 and also keeps ordering.

  • Acknowledgement acks = all.

  • A producer is made idempotent by setting enable.idempotence=true.

Sticky Partitioner - Performance Improvement

  • It is an optimization done by Kafka behind the scene by which one batch writes to a topic lands on same partition, optimizing batch writes.

  • When key is null and sending message to Kafka, default partitioner will rebalance the partitioner if new partition is added.

  • It is possible to change default partitioner by setting partitioner.class. Not recommended.

  • Till Kafka v2.3 the default partitioner was RoundRobin Partitioner, but after v2.4 this value is Sticky Partitioner.

  • With this partitoner batch writes will be done in a Round Robin manner.

Message Compression

  • When data is send from producer to Kafka, the compression helps to reduce bandwidth usage hence can send data faster.

  • Improves throughput, reduces latency, smaller message size, better disk utilization at Kafka side.

  • Producer and consumer may have to spent some cpu cycle to compress and decompress data respectively.

  • Recommended to use compression in production.

  • There are two places compression can be applied.

    • Producer Level

      • By default compression.type is none, but can be gzip, snappy, lz4 and zstd (since v2.1).

      • The bigger the batch of message, more the compression being effective.

    • Kafka Level i.e Broker or Topic Level

      • Broker Level

        • Applies to all the topics.

        • compression.type=producer which is default setting.

        • Takes compressed batch from producer and writes to topic log file without recompression. Decompression happens at consumer side.

        • compression.type=none, all batches are decompressed by the broker.

        • compression.type=lz4|gzip|snappy, if any of the compression type is sepecified then the compression type has to match the producer side, else the data will be recompressed as per the given compression.type.

        • Enabling compression at broker side, consumes extra cpu cycle at broker side.

      • Topic Level

        • Applies to only that topic.

Batching

  • By default, Kafka producers try to send records as soon as possible.

  • It will have upto max.inflight.requests.per.connection=5, meaning upto 5 message batches are inflight between producer and Kafka.

  • If more messages are to be sent, Kafka will do smart batching and the messages will be batched before the next send.

  • This strategy increases throughput and reduces latency.

  • Batching gives better compression, if enabled.

  • linger.ms tells Kafka client to wait for given period before executing a batch send.

  • batch.size is the max bytes included in a batch.

    • If a batch is filled before linger.ms is reached, send the batch. Batch size can be increased, by default it is 16KB.

    • Batch size is allocated per partition.

Last updated