From 6acc220e03a3d38d1ceb26926f072db56809ef1b Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 18 Jul 2024 18:31:36 +0800 Subject: [PATCH] KAFKA-15773 Group protocol configuration should be validated (#16543) Reviewers: Chia-Ping Tsai --- .../apache/kafka/clients/consumer/ConsumerConfig.java | 7 +++++++ .../kafka/clients/consumer/ConsumerConfigTest.java | 11 +++++++++++ .../consumer/internals/AsyncKafkaConsumerTest.java | 9 ++++----- .../consumer/group/DescribeConsumerGroupTest.java | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index c4c10c404b4..c3a0bf57900 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -674,6 +674,7 @@ public class ConsumerConfig extends AbstractConfig { Map refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); maybeOverrideClientId(refinedConfigs); maybeOverrideEnableAutoCommit(refinedConfigs); + checkGroupRemoteAssignor(); return refinedConfigs; } @@ -720,6 +721,12 @@ public class ConsumerConfig extends AbstractConfig { } } + private void checkGroupRemoteAssignor() { + if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) { + throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name()); + } + } + public ConsumerConfig(Properties props) { super(CONFIG, props); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java index e4cc699484e..99c45f05c15 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java @@ -193,6 +193,17 @@ public class ConsumerConfigTest { assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } + @Test + public void testRemoteAssignorWithClassicGroupProtocol() { + String remoteAssignorName = "SomeAssignor"; + final Map configs = new HashMap<>(); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); + configs.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, remoteAssignorName); + ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); + assertTrue(exception.getMessage().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name())); + } + @Test public void testDefaultMetadataRecoveryStrategy() { Map configs = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 7ef0b28108e..f1d227fae4e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; @@ -1758,6 +1759,7 @@ public class AsyncKafkaConsumerTest { public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() { final Properties props = requiredConsumerConfig(); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); final ConsumerConfig config = new ConsumerConfig(props); consumer = newConsumer(config); @@ -1765,15 +1767,12 @@ public class AsyncKafkaConsumerTest { } @Test - public void testGroupRemoteAssignorUnusedInGenericProtocol() { + public void testGroupRemoteAssignorInClassicProtocol() { final Properties props = requiredConsumerConfig(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); - final ConsumerConfig config = new ConsumerConfig(props); - consumer = newConsumer(config); - - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); + assertThrows(ConfigException.class, () -> new ConsumerConfig(props)); } @Test diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 852a6434a1c..9883dc471eb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -338,7 +338,7 @@ public class DescribeConsumerGroupTest { createTopic(topic); // run one consumer in the group consuming from a single-partition topic - try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupProtocol == GroupProtocol.CONSUMER ? "range" : "")); ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> {