From 4c8d96c0f01ac34d9c8370c47f4ce44f680ce4a5 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Tue, 18 Feb 2025 23:57:56 +0800 Subject: [PATCH] KAFKA-18767: Add client side config check for shareConsumer (#18850) Reviewers: Andrew Schofield --- .../clients/consumer/KafkaShareConsumer.java | 6 +- .../clients/consumer/ShareConsumerConfig.java | 77 +++++++++++++++++++ .../kafka/common/config/AbstractConfig.java | 13 +++- .../KafkaShareConsumerMetricsTest.java | 6 +- .../consumer/KafkaShareConsumerTest.java | 6 +- .../consumer/ShareConsumerConfigTest.java | 53 +++++++++++++ .../kafka/common/test/ClusterInstance.java | 1 - 7 files changed, 151 insertions(+), 11 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerConfigTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java index 19036875d11..a5c92394353 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java @@ -403,11 +403,11 @@ public class KafkaShareConsumer implements ShareConsumer { public KafkaShareConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { - this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), + this(new ShareConsumerConfig(ShareConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer); } - KafkaShareConsumer(ConsumerConfig config, + KafkaShareConsumer(ShareConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) { delegate = CREATOR.create(config, keyDeserializer, valueDeserializer); @@ -416,7 +416,7 @@ public class KafkaShareConsumer implements ShareConsumer { KafkaShareConsumer(final LogContext logContext, final String clientId, final String groupId, - final ConsumerConfig config, + final ShareConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final Time time, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java new file mode 100644 index 00000000000..ce22d7b0a2e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.config.ConfigException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * The share consumer configuration keys + */ +public class ShareConsumerConfig extends ConsumerConfig { + /** + * A list of configuration keys not supported for SHARE consumer. + */ + private static final List SHARE_GROUP_UNSUPPORTED_CONFIGS = List.of( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, + ConsumerConfig.GROUP_PROTOCOL_CONFIG, + ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG + ); + + public ShareConsumerConfig(Properties props) { + super(props); + } + + public ShareConsumerConfig(Map props) { + super(props); + } + + protected ShareConsumerConfig(Map props, boolean doLog) { + super(props, doLog); + } + + @Override + protected Map preProcessParsedConfig(final Map parsedValues) { + checkUnsupportedConfigs(parsedValues); + return parsedValues; + } + + private void checkUnsupportedConfigs(Map parsedValues) { + List invalidConfigs = new ArrayList<>(); + SHARE_GROUP_UNSUPPORTED_CONFIGS.forEach(configName -> { + if (parsedValues.containsKey(configName)) { + invalidConfigs.add(configName); + } + }); + if (!invalidConfigs.isEmpty()) { + throw new ConfigException(String.join(", ", invalidConfigs) + + " cannot be set when using a share group."); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 3710ab2811c..8270f22b65c 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -110,7 +110,7 @@ public class AbstractConfig { @SuppressWarnings({"this-escape"}) public AbstractConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) { Map originalMap = Utils.castToStringObjectMap(originals); - + preProcessParsedConfig(originalMap); this.originals = resolveConfigVariables(configProviderProps, originalMap); this.values = definition.parse(this.originals); Map configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values)); @@ -147,6 +147,17 @@ public class AbstractConfig { } + /** + * Called directly after user configs got parsed (and thus default values is not set). + * This allows to check user's config. + * + * @param parsedValues unmodifiable map of current configuration + * @return a map of updates that should be applied to the configuration (will be validated to prevent bad updates) + */ + protected Map preProcessParsedConfig(Map parsedValues) { + return Map.of(); + } + /** * Called directly after user configs got parsed (and thus default values got set). * This allows to change default values for "secondary defaults" if required. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java index ae7f1774bfc..ad46bf0887e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java @@ -320,7 +320,7 @@ public class KafkaShareConsumerMetricsTest { Deserializer keyDeserializer = new StringDeserializer(); Deserializer valueDeserializer = valueDeserializerOpt.orElse(new StringDeserializer()); LogContext logContext = new LogContext(); - ConsumerConfig config = newConsumerConfig(groupId, valueDeserializer); + ShareConsumerConfig config = newConsumerConfig(groupId, valueDeserializer); return new KafkaShareConsumer<>( logContext, clientId, @@ -335,7 +335,7 @@ public class KafkaShareConsumerMetricsTest { ); } - private ConsumerConfig newConsumerConfig(String groupId, + private ShareConsumerConfig newConsumerConfig(String groupId, Deserializer valueDeserializer) { String clientId = "mock-consumer"; long retryBackoffMs = 100; @@ -368,7 +368,7 @@ public class KafkaShareConsumerMetricsTest { configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); - return new ConsumerConfig(configs); + return new ShareConsumerConfig(configs); } private void initMetadata(MockClient mockClient, Map partitionCounts) { Map metadataIds = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java index db869594eb5..c7c05acf9b5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java @@ -279,7 +279,7 @@ public class KafkaShareConsumerTest { LogContext logContext = new LogContext(); Deserializer keyDeserializer = new StringDeserializer(); Deserializer valueDeserializer = new StringDeserializer(); - ConsumerConfig config = newConsumerConfig(clientId); + ShareConsumerConfig config = newConsumerConfig(clientId); return new KafkaShareConsumer<>( logContext, @@ -295,14 +295,14 @@ public class KafkaShareConsumerTest { ); } - private ConsumerConfig newConsumerConfig(String clientId) { + private ShareConsumerConfig newConsumerConfig(String clientId) { Map configs = new HashMap<>(); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); - return new ConsumerConfig(configs); + return new ShareConsumerConfig(configs); } private void initMetadata(MockClient client, Map partitions) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerConfigTest.java new file mode 100644 index 00000000000..8f4c4b54556 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerConfigTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ShareConsumerConfigTest { + + @Test + public void testUnsupportedShareConsumerConfigs() { + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.clients.consumer.ConsumerInterceptor")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3000")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")); + verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "null")); + } + + private void verifyUnsupportedShareConsumerConfig(Map extraConfig) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "1"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.putAll(extraConfig); + assertThrows(ConfigException.class, () -> new ShareConsumerConfig(props)); + } +} \ No newline at end of file diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 50cb9d7e3d8..26b4e1e471a 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -199,7 +199,6 @@ public interface ClusterInstance { Map props = new HashMap<>(configs); props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5)); props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); return new KafkaShareConsumer<>(setClientSaslConfig(props));