KAFKA-18767: Add client side config check for shareConsumer (#18850)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
TaiJuWu 2025-02-18 23:57:56 +08:00 committed by GitHub
parent ed366e6b89
commit 4c8d96c0f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 151 additions and 11 deletions

View File

@ -403,11 +403,11 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
public KafkaShareConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
this(new ShareConsumerConfig(ShareConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
}
KafkaShareConsumer(ConsumerConfig config,
KafkaShareConsumer(ShareConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
@ -416,7 +416,7 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
KafkaShareConsumer(final LogContext logContext,
final String clientId,
final String groupId,
final ConsumerConfig config,
final ShareConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Time time,

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.config.ConfigException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* The share consumer configuration keys
*/
public class ShareConsumerConfig extends ConsumerConfig {
/**
* A list of configuration keys not supported for SHARE consumer.
*/
private static final List<String> SHARE_GROUP_UNSUPPORTED_CONFIGS = List.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
ConsumerConfig.GROUP_PROTOCOL_CONFIG,
ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG
);
public ShareConsumerConfig(Properties props) {
super(props);
}
public ShareConsumerConfig(Map<String, Object> props) {
super(props);
}
protected ShareConsumerConfig(Map<?, ?> props, boolean doLog) {
super(props, doLog);
}
@Override
protected Map<String, Object> preProcessParsedConfig(final Map<String, Object> parsedValues) {
checkUnsupportedConfigs(parsedValues);
return parsedValues;
}
private void checkUnsupportedConfigs(Map<String, Object> parsedValues) {
List<String> invalidConfigs = new ArrayList<>();
SHARE_GROUP_UNSUPPORTED_CONFIGS.forEach(configName -> {
if (parsedValues.containsKey(configName)) {
invalidConfigs.add(configName);
}
});
if (!invalidConfigs.isEmpty()) {
throw new ConfigException(String.join(", ", invalidConfigs) +
" cannot be set when using a share group.");
}
}
}

View File

@ -110,7 +110,7 @@ public class AbstractConfig {
@SuppressWarnings({"this-escape"})
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
Map<String, Object> originalMap = Utils.castToStringObjectMap(originals);
preProcessParsedConfig(originalMap);
this.originals = resolveConfigVariables(configProviderProps, originalMap);
this.values = definition.parse(this.originals);
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
@ -147,6 +147,17 @@ public class AbstractConfig {
}
/**
* Called directly after user configs got parsed (and thus default values is not set).
* This allows to check user's config.
*
* @param parsedValues unmodifiable map of current configuration
* @return a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
*/
protected Map<String, Object> preProcessParsedConfig(Map<String, Object> parsedValues) {
return Map.of();
}
/**
* Called directly after user configs got parsed (and thus default values got set).
* This allows to change default values for "secondary defaults" if required.

View File

@ -320,7 +320,7 @@ public class KafkaShareConsumerMetricsTest {
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = valueDeserializerOpt.orElse(new StringDeserializer());
LogContext logContext = new LogContext();
ConsumerConfig config = newConsumerConfig(groupId, valueDeserializer);
ShareConsumerConfig config = newConsumerConfig(groupId, valueDeserializer);
return new KafkaShareConsumer<>(
logContext,
clientId,
@ -335,7 +335,7 @@ public class KafkaShareConsumerMetricsTest {
);
}
private ConsumerConfig newConsumerConfig(String groupId,
private ShareConsumerConfig newConsumerConfig(String groupId,
Deserializer<String> valueDeserializer) {
String clientId = "mock-consumer";
long retryBackoffMs = 100;
@ -368,7 +368,7 @@ public class KafkaShareConsumerMetricsTest {
configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
return new ConsumerConfig(configs);
return new ShareConsumerConfig(configs);
}
private void initMetadata(MockClient mockClient, Map<String, Integer> partitionCounts) {
Map<String, Uuid> metadataIds = new HashMap<>();

View File

@ -279,7 +279,7 @@ public class KafkaShareConsumerTest {
LogContext logContext = new LogContext();
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
ConsumerConfig config = newConsumerConfig(clientId);
ShareConsumerConfig config = newConsumerConfig(clientId);
return new KafkaShareConsumer<>(
logContext,
@ -295,14 +295,14 @@ public class KafkaShareConsumerTest {
);
}
private ConsumerConfig newConsumerConfig(String clientId) {
private ShareConsumerConfig newConsumerConfig(String clientId) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
return new ConsumerConfig(configs);
return new ShareConsumerConfig(configs);
}
private void initMetadata(MockClient client, Map<String, Integer> partitions) {

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ShareConsumerConfigTest {
@Test
public void testUnsupportedShareConsumerConfigs() {
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.clients.consumer.ConsumerInterceptor"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3000"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"));
verifyUnsupportedShareConsumerConfig(Map.of(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "null"));
}
private void verifyUnsupportedShareConsumerConfig(Map<String, Object> extraConfig) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.putAll(extraConfig);
assertThrows(ConfigException.class, () -> new ShareConsumerConfig(props));
}
}

View File

@ -199,7 +199,6 @@ public interface ClusterInstance {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaShareConsumer<>(setClientSaslConfig(props));