diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index aa3b5c9d628..e2022e0f4d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -192,7 +192,8 @@ public class CommonClientConfigs { + "is considered failed and the group will rebalance in order to reassign the partitions to another member. " + "For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. " + "Instead, the consumer will stop sending heartbeats and partitions will be reassigned " - + "after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown."; + + "after expiration of the session timeout (defined by the client config session.timeout.ms if using the Classic rebalance protocol, or by the broker config group.consumer.session.timeout.ms if using the Consumer protocol). " + + "This mirrors the behavior of a static consumer which has shutdown."; public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms"; public static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " @@ -206,15 +207,18 @@ public class CommonClientConfigs { + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " + "then the broker will remove this client from the group and initiate a rebalance. Note that the value " + "must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms " - + "and group.max.session.timeout.ms. Note that this configuration is not supported when group.protocol " - + "is set to \"consumer\"."; + + "and group.max.session.timeout.ms. Note that this client configuration is not supported when group.protocol " + + "is set to \"consumer\". In that case, session timeout is controlled by the broker config group.consumer.session.timeout.ms."; public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " - + "The value must be set lower than session.timeout.ms, but typically should be set no higher " - + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + + "This config is only supported if group.protocol is set to \"classic\". In that case, " + + "the value must be set lower than session.timeout.ms, but typically should be set no higher " + + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances." + + "If group.protocol is set to \"consumer\", this config is not supported, as " + + "the heartbeat interval is controlled by the broker with group.consumer.heartbeat.interval.ms."; public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 9d219907926..3998d672006 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -63,8 +63,9 @@ public abstract class AbstractHeartbeatRequestManager