Skip to content

Commit

Permalink
docs(tuning): consumer config: updates content on rebalances (#10292)
Browse files Browse the repository at this point in the history
Signed-off-by: prmellor <[email protected]>
  • Loading branch information
PaulRMellor committed Jul 4, 2024
1 parent 0a59d5d commit b6c9fc3
Showing 1 changed file with 77 additions and 25 deletions.
102 changes: 77 additions & 25 deletions documentation/modules/managing/con-consumer-config-properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,19 @@ A shorter interval between consecutive heartbeats allows for quicker detection o
The heartbeat interval must be lower, usually by a third, than the session timeout.
Decreasing the heartbeat interval reduces the chance of accidental rebalancing, but more frequent heartbeats increases the overhead on broker resources.

[source,env]
----
# ...
max.poll.records=100 # <1>
session.timeout.ms=30000 # <2>
heartbeat.interval.ms=5000 # <3>
# ...
----
<1> Set the number records returned to the consumer when calling the `poll()` method.
<2> Set the timeout for detecting client failure.
If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range.
<3> Adjust the heartbeat interval according to anticipated rebalances.

== Managing offset policy

Use the `auto.offset.reset` property to control how a consumer behaves when no offsets have been committed,
Expand All @@ -267,49 +280,88 @@ If a consumer group or standalone consumer is inactive and commits no offsets du
[source,env]
----
# ...
heartbeat.interval.ms=3000 <1>
session.timeout.ms=45000 <2>
auto.offset.reset=earliest <3>
auto.offset.reset=earliest # <1>
# ...
----
<1> Adjust the heartbeat interval lower according to anticipated rebalances.
<2> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated.
If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range.
<3> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed.
<1> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed.

If the amount of data returned in a single fetch request is large,
a timeout might occur before the consumer has processed it.
In this case, you can lower `max.partition.fetch.bytes` or increase `session.timeout.ms`.

== Minimizing the impact of rebalances
== Minimizing the impact of rebalances

Rebalances in Kafka consumer groups can introduce latency and reduce throughput, impacting overall service performance.
The rebalancing of a partition between active consumers in a group is the time it takes for the following to take place:

* Consumers to commit their offsets
* The new consumer group to be formed
* The group leader to assign partitions to group members
* The consumers in the group to receive their assignments and start fetching

The rebalancing process can increase the downtime of a service, particularly if it happens repeatedly during a rolling restart of a consumer group cluster.
Rebalances are triggered by changes in consumer health, network issues, configuration updates, and scaling events.
This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of consumers in a group.

In this situation, you can introduce _static membership_ by assigning a unique identifier (`group.instance.id`) to each consumer instance within the group.
Static membership uses persistence so that a consumer instance is recognized during a restart after a session timeout.
Consequently, the consumer maintains its assignment of topic partitions, reducing unnecessary rebalancing when it rejoins the group after a failure or restart.

Additionally, adjusting the `max.poll.interval.ms` configuration can prevent rebalances caused by prolonged processing tasks, allowing you to specify the maximum interval between polls for new messages.
Use the `max.poll.records` property to cap the number of records returned from the consumer buffer during each poll.
Reducing the number of records allows the consumer to process fewer messages more efficiently.
In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads.
This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records.
To minimize the impact of rebalances, consider the following strategies and configurations:

[source,shell,subs="+quotes"]
Assess throughput and parallelism:: Assess the expected throughput (bytes and records per second) and parallelism (number of partitions) of the input topics against the number of consumers.
+
If adjustments are needed, start by setting up static membership, adopting a partition assignment strategy, and setting a limit on the number of records returned using the `max.poll.records` property.
Add further configurations for timeouts and intervals, if required and with care, as these can introduce issues related to the handling of failures.

Use static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance.
Static membership introduces persistence so static consumers retain partition assignments across restarts, reducing unnecessary rebalances.

Adopt partition assignment strategies::
* Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers.
* The `org.apache.kafka.clients.consumer.CooperativeStickyAssignor` strategy is particularly effective, as it ensures minimal partition movement and better stability during rebalances.

Adjust record limits and poll intervals::
* Use the `max.poll.records` property to limit the number of records returned during each poll.
Processing fewer messages more efficiently can prevent delays.
* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between calls to the `poll()` method.
* Alternatively, consider pausing partitions to retrieve fewer records at a time.

Adjust session timeout and heartbeat intervals::
* Use the `session.timeout.ms` property to set a longer timeout to reduce rebalances caused by temporary network glitches or minor processing delays.
* Adjust the `heartbeat.interval.ms` property to balance failure detection checks with minimizing unnecessary rebalances.

Monitor consumer health:: Instability in consumer applications, such as frequent crashes, can trigger rebalances.
Use Kafka consumer metrics to monitor such things as rebalance rates, session timouts, and failed fetch requests.

.Example configuration to minimize the impact of rebalances
[source,shell]
----
# ...
group.instance.id=_UNIQUE-ID_ <1>
max.poll.interval.ms=300000 <2>
max.poll.records=500 <3>
group.instance.id=<unique_id>
max.poll.interval.ms=300000
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=5000
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# ...
----
<1> The unique instance id ensures that a new consumer instance receives the same assignment of topic partitions.
<2> Set the interval to check the consumer is continuing to process messages.
<3> Sets the number of processed records returned from the consumer.

.Scaling strategies
To minimize the impact of rebalances during scaling of consumer groups, consider the following approaches:

Set a rebalance delay:: Use the `group.initial.rebalance.delay.ms` property in the Kafka configuration to delay the time it takes for consumers to join a new consumer group before performing a rebalance.
Introducing a delay helps avoid triggering several rebalances when starting multiple consumers near the same time.
The appropriate delay depends on the orchestration used and might not be suitable in some circumstances.

Avoid frequent scaling::
* Keep the number of consumers stable, scaling only when necessary and in controlled increments.
* Monitor system performance and adjust your scaling strategy as needed.
** Lag per partition should be constant and low.
** Records processed per second by consumers should match the records per second in the input topics.
* Use the Kafka Exporter to check for consumer lag and determine if scaling is required.

Implement dynamic scaling policies::
* If using dynamic or event-driven tools for scaling of consumer applications, set lag thresholds based on the backlog of messages.
* Define maximum and minimum replica counts for consumer groups.
* Set periods between scaling events to prevent rapid scaling.

NOTE: In cases where lengthy message processing is unavoidable, consider pausing and resuming partitions as needed.
If you pause all partitions, `poll()` returns no records, allowing you to keep calling it without overwhelming the consumers.
Alternatively, you can offload the processing tasks to a pool of worker threads.
This helps prevents delays and potential rebalances.

0 comments on commit b6c9fc3

Please sign in to comment.