KAFKA-15773 Group protocol configuration should be validated (#16543)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-07-18 18:31:36 +08:00 committed by GitHub
parent 94f5a4f63e
commit 6acc220e03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 6 deletions

View File

@ -674,6 +674,7 @@ public class ConsumerConfig extends AbstractConfig {
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs); maybeOverrideClientId(refinedConfigs);
maybeOverrideEnableAutoCommit(refinedConfigs); maybeOverrideEnableAutoCommit(refinedConfigs);
checkGroupRemoteAssignor();
return refinedConfigs; return refinedConfigs;
} }
@ -720,6 +721,12 @@ public class ConsumerConfig extends AbstractConfig {
} }
} }
private void checkGroupRemoteAssignor() {
if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
}
}
public ConsumerConfig(Properties props) { public ConsumerConfig(Properties props) {
super(CONFIG, props); super(CONFIG, props);
} }

View File

@ -193,6 +193,17 @@ public class ConsumerConfigTest {
assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
} }
@Test
public void testRemoteAssignorWithClassicGroupProtocol() {
String remoteAssignorName = "SomeAssignor";
final Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, remoteAssignorName);
ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(exception.getMessage().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name()));
}
@Test @Test
public void testDefaultMetadataRecoveryStrategy() { public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();

View File

@ -55,6 +55,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InterruptException;
@ -1758,6 +1759,7 @@ public class AsyncKafkaConsumerTest {
public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() { public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
final Properties props = requiredConsumerConfig(); final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
final ConsumerConfig config = new ConsumerConfig(props); final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config); consumer = newConsumer(config);
@ -1765,15 +1767,12 @@ public class AsyncKafkaConsumerTest {
} }
@Test @Test
public void testGroupRemoteAssignorUnusedInGenericProtocol() { public void testGroupRemoteAssignorInClassicProtocol() {
final Properties props = requiredConsumerConfig(); final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
final ConsumerConfig config = new ConsumerConfig(props); assertThrows(ConfigException.class, () -> new ConsumerConfig(props));
consumer = newConsumer(config);
assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
} }
@Test @Test

View File

@ -338,7 +338,7 @@ public class DescribeConsumerGroupTest {
createTopic(topic); createTopic(topic);
// run one consumer in the group consuming from a single-partition topic // run one consumer in the group consuming from a single-partition topic
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")); try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupProtocol == GroupProtocol.CONSUMER ? "range" : ""));
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) { ) {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {