KAFKA-17338 ConsumerConfig should prevent using partition assignors with CONSUMER group protocol (#16899)

Reviewers: Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
Ken Huang 2024-11-29 22:36:29 +08:00 committed by GitHub
parent e7bbcdb251
commit 9d23f89e05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 113 additions and 43 deletions

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -61,8 +61,12 @@ public class ConsumerConfig extends AbstractConfig {
// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS = List.of(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
);
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
@ -376,6 +380,22 @@ public class ConsumerConfig extends AbstractConfig {
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
/**
* A list of configuration keys not supported for CLASSIC protocol.
*/
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList(
GROUP_REMOTE_ASSIGNOR_CONFIG
);
/**
* A list of configuration keys not supported for CONSUMER protocol.
*/
private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
HEARTBEAT_INTERVAL_MS_CONFIG,
SESSION_TIMEOUT_MS_CONFIG
);
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@ -409,7 +429,7 @@ public class ConsumerConfig extends AbstractConfig {
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),
List.of(RangeAssignor.class, CooperativeStickyAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
@ -667,7 +687,7 @@ public class ConsumerConfig extends AbstractConfig {
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
maybeOverrideEnableAutoCommit(refinedConfigs);
checkGroupRemoteAssignor();
checkUnsupportedConfigs();
return refinedConfigs;
}
@ -714,9 +734,28 @@ public class ConsumerConfig extends AbstractConfig {
}
}
private void checkGroupRemoteAssignor() {
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
private void checkUnsupportedConfigs() {
String groupProtocol = getString(GROUP_PROTOCOL_CONFIG);
if (GroupProtocol.CLASSIC.name().equalsIgnoreCase(groupProtocol)) {
checkUnsupportedConfigs(GroupProtocol.CLASSIC, CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS);
} else if (GroupProtocol.CONSUMER.name().equalsIgnoreCase(groupProtocol)) {
checkUnsupportedConfigs(GroupProtocol.CONSUMER, CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS);
}
}
private void checkUnsupportedConfigs(GroupProtocol groupProtocol, List<String> unsupportedConfigs) {
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) {
List<String> invalidConfigs = new ArrayList<>();
unsupportedConfigs.forEach(configName -> {
Object config = originals().get(configName);
if (config != null && !Utils.isBlank(config.toString())) {
invalidConfigs.add(configName);
}
});
if (!invalidConfigs.isEmpty()) {
throw new ConfigException(String.join(", ", invalidConfigs) +
" cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name());
}
}
}

View File

@ -237,4 +237,23 @@ public class ConsumerConfigTest {
assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
}
}
@Test
public void testUnsupportedConfigsWithConsumerGroupProtocol() {
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "RoundRobinAssignor");
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
}
private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName, Object value) {
final Map<String, Object> configs = Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass,
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name(),
configName, value
);
ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertEquals(configName + " cannot be set when " +
ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage());
}
}

View File

@ -3065,7 +3065,10 @@ public class KafkaConsumerTest {
configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minBytes);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
if (groupProtocol == GroupProtocol.CLASSIC) {
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
}
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
@ -3074,7 +3077,6 @@ public class KafkaConsumerTest {
configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
configs.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs);
configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
configs.put(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, throwOnStableOffsetNotSupported);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
groupInstanceId.ifPresent(gi -> configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gi));

View File

@ -1535,18 +1535,6 @@ public class AsyncKafkaConsumerTest {
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
@Test
public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor");
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}
@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerConfig();

View File

@ -17,7 +17,7 @@
package kafka.api
import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, GroupProtocol}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo}
@ -85,8 +85,10 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
// Use higher poll timeout to avoid consumer leaving the group due to timeout
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000")
val consumer = createConsumer()

View File

@ -2386,7 +2386,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
// Increase timeouts to avoid having a rebalance during the test
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
}
Using.resource(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
consumer.subscribe(Collections.singletonList(testTopicName))

View File

@ -61,8 +61,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
}
val consumer = createConsumer()
@ -87,8 +89,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
val consumer = createConsumer()
@ -129,8 +133,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsDelayInAssignment(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
val consumer = createConsumer()
@ -154,7 +160,9 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest {
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
}
val consumer = createConsumer()
val listener = new TestConsumerReassignmentListener

View File

@ -211,8 +211,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testUnsubscribeTopic(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
}
val consumer = createConsumer()
val listener = new TestConsumerReassignmentListener()

View File

@ -388,8 +388,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPauseStateNotPreservedByRebalance(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
}
val consumer = createConsumer()
val producer = createProducer()

View File

@ -337,8 +337,9 @@ public class DeleteConsumerGroupsTest {
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}
configs.putAll(customConfigs);
return configs;
}

View File

@ -251,7 +251,9 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
// Increase timeouts to avoid having a rebalance during the test
consumerConfig.putIfAbsent(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT));
if (groupProtocol == GroupProtocol.CLASSIC) {
consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT));
}
return new KafkaConsumer<>(consumerConfig);
}

View File

@ -992,7 +992,6 @@ public class DescribeConsumerGroupTest {
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
configs.putAll(customConfigs);
return configs;

View File

@ -511,7 +511,9 @@ public class ListConsumerGroupTest {
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}
configs.putAll(customConfigs);
return configs;

View File

@ -800,9 +800,11 @@ public class ResetConsumerGroupOffsetTest {
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name);
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
configs.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
configs.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000);
if (GroupProtocol.CLASSIC == groupProtocol) {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
}
return configs;
}