From b6c9fc3de5600209b049eedd190f8343fd1a2c2f Mon Sep 17 00:00:00 2001 From: PaulRMellor <47596553+PaulRMellor@users.noreply.github.com> Date: Thu, 4 Jul 2024 17:42:55 +0100 Subject: [PATCH] docs(tuning): consumer config: updates content on rebalances (#10292) Signed-off-by: prmellor --- .../con-consumer-config-properties.adoc | 102 +++++++++++++----- 1 file changed, 77 insertions(+), 25 deletions(-) diff --git a/documentation/modules/managing/con-consumer-config-properties.adoc b/documentation/modules/managing/con-consumer-config-properties.adoc index b3e675c320f..9221181ab0e 100644 --- a/documentation/modules/managing/con-consumer-config-properties.adoc +++ b/documentation/modules/managing/con-consumer-config-properties.adoc @@ -250,6 +250,19 @@ A shorter interval between consecutive heartbeats allows for quicker detection o The heartbeat interval must be lower, usually by a third, than the session timeout. Decreasing the heartbeat interval reduces the chance of accidental rebalancing, but more frequent heartbeats increases the overhead on broker resources. +[source,env] +---- +# ... +max.poll.records=100 # <1> +session.timeout.ms=30000 # <2> +heartbeat.interval.ms=5000 # <3> +# ... +---- +<1> Set the number records returned to the consumer when calling the `poll()` method. +<2> Set the timeout for detecting client failure. +If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. +<3> Adjust the heartbeat interval according to anticipated rebalances. + == Managing offset policy Use the `auto.offset.reset` property to control how a consumer behaves when no offsets have been committed, @@ -267,22 +280,18 @@ If a consumer group or standalone consumer is inactive and commits no offsets du [source,env] ---- # ... -heartbeat.interval.ms=3000 <1> -session.timeout.ms=45000 <2> -auto.offset.reset=earliest <3> +auto.offset.reset=earliest # <1> # ... ---- -<1> Adjust the heartbeat interval lower according to anticipated rebalances. -<2> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated. -If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range. -<3> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed. +<1> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed. If the amount of data returned in a single fetch request is large, a timeout might occur before the consumer has processed it. In this case, you can lower `max.partition.fetch.bytes` or increase `session.timeout.ms`. -== Minimizing the impact of rebalances +== Minimizing the impact of rebalances +Rebalances in Kafka consumer groups can introduce latency and reduce throughput, impacting overall service performance. The rebalancing of a partition between active consumers in a group is the time it takes for the following to take place: * Consumers to commit their offsets @@ -290,26 +299,69 @@ The rebalancing of a partition between active consumers in a group is the time i * The group leader to assign partitions to group members * The consumers in the group to receive their assignments and start fetching -The rebalancing process can increase the downtime of a service, particularly if it happens repeatedly during a rolling restart of a consumer group cluster. +Rebalances are triggered by changes in consumer health, network issues, configuration updates, and scaling events. +This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of consumers in a group. -In this situation, you can introduce _static membership_ by assigning a unique identifier (`group.instance.id`) to each consumer instance within the group. -Static membership uses persistence so that a consumer instance is recognized during a restart after a session timeout. -Consequently, the consumer maintains its assignment of topic partitions, reducing unnecessary rebalancing when it rejoins the group after a failure or restart. - -Additionally, adjusting the `max.poll.interval.ms` configuration can prevent rebalances caused by prolonged processing tasks, allowing you to specify the maximum interval between polls for new messages. -Use the `max.poll.records` property to cap the number of records returned from the consumer buffer during each poll. -Reducing the number of records allows the consumer to process fewer messages more efficiently. -In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads. -This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records. +To minimize the impact of rebalances, consider the following strategies and configurations: -[source,shell,subs="+quotes"] +Assess throughput and parallelism:: Assess the expected throughput (bytes and records per second) and parallelism (number of partitions) of the input topics against the number of consumers. ++ +If adjustments are needed, start by setting up static membership, adopting a partition assignment strategy, and setting a limit on the number of records returned using the `max.poll.records` property. +Add further configurations for timeouts and intervals, if required and with care, as these can introduce issues related to the handling of failures. + +Use static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance. +Static membership introduces persistence so static consumers retain partition assignments across restarts, reducing unnecessary rebalances. + +Adopt partition assignment strategies:: +* Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers. +* The `org.apache.kafka.clients.consumer.CooperativeStickyAssignor` strategy is particularly effective, as it ensures minimal partition movement and better stability during rebalances. + +Adjust record limits and poll intervals:: +* Use the `max.poll.records` property to limit the number of records returned during each poll. +Processing fewer messages more efficiently can prevent delays. +* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between calls to the `poll()` method. +* Alternatively, consider pausing partitions to retrieve fewer records at a time. + +Adjust session timeout and heartbeat intervals:: +* Use the `session.timeout.ms` property to set a longer timeout to reduce rebalances caused by temporary network glitches or minor processing delays. +* Adjust the `heartbeat.interval.ms` property to balance failure detection checks with minimizing unnecessary rebalances. + +Monitor consumer health:: Instability in consumer applications, such as frequent crashes, can trigger rebalances. +Use Kafka consumer metrics to monitor such things as rebalance rates, session timouts, and failed fetch requests. + +.Example configuration to minimize the impact of rebalances +[source,shell] ---- # ... -group.instance.id=_UNIQUE-ID_ <1> -max.poll.interval.ms=300000 <2> -max.poll.records=500 <3> +group.instance.id= +max.poll.interval.ms=300000 +max.poll.records=500 +session.timeout.ms=30000 +heartbeat.interval.ms=5000 +partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor # ... ---- -<1> The unique instance id ensures that a new consumer instance receives the same assignment of topic partitions. -<2> Set the interval to check the consumer is continuing to process messages. -<3> Sets the number of processed records returned from the consumer. + +.Scaling strategies +To minimize the impact of rebalances during scaling of consumer groups, consider the following approaches: + +Set a rebalance delay:: Use the `group.initial.rebalance.delay.ms` property in the Kafka configuration to delay the time it takes for consumers to join a new consumer group before performing a rebalance. +Introducing a delay helps avoid triggering several rebalances when starting multiple consumers near the same time. +The appropriate delay depends on the orchestration used and might not be suitable in some circumstances. + +Avoid frequent scaling:: +* Keep the number of consumers stable, scaling only when necessary and in controlled increments. +* Monitor system performance and adjust your scaling strategy as needed. +** Lag per partition should be constant and low. +** Records processed per second by consumers should match the records per second in the input topics. +* Use the Kafka Exporter to check for consumer lag and determine if scaling is required. + +Implement dynamic scaling policies:: +* If using dynamic or event-driven tools for scaling of consumer applications, set lag thresholds based on the backlog of messages. +* Define maximum and minimum replica counts for consumer groups. +* Set periods between scaling events to prevent rapid scaling. + +NOTE: In cases where lengthy message processing is unavoidable, consider pausing and resuming partitions as needed. +If you pause all partitions, `poll()` returns no records, allowing you to keep calling it without overwhelming the consumers. +Alternatively, you can offload the processing tasks to a pool of worker threads. +This helps prevents delays and potential rebalances. \ No newline at end of file