mirror of https://github.com/apache/kafka.git
MINOR: Accept specifying consumer group assignors by their short names (#18832)
At the moment, we require specifying builtin server side assignors by their full class name. This is not convenient and also exposed their full class name as part of our public API. This patch changes it to accept specifying builtin server side assignor by their short name (uniform or range) while continuing to accept full class name for customer assignors. Reviewers: Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
parent
8b22f10083
commit
be89ce5f61
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.coordinator.group;
|
package org.apache.kafka.coordinator.group;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.Configurable;
|
||||||
|
import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.config.AbstractConfig;
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
|
@ -25,11 +27,14 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
|
||||||
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
||||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
|
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
|
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
|
||||||
|
@ -41,6 +46,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
|
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
|
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
|
||||||
|
import static org.apache.kafka.common.utils.Utils.maybeCloseQuietly;
|
||||||
import static org.apache.kafka.common.utils.Utils.require;
|
import static org.apache.kafka.common.utils.Utils.require;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -120,12 +126,19 @@ public class GroupCoordinatorConfig {
|
||||||
"group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
|
"group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
|
||||||
public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
|
public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
|
||||||
|
|
||||||
public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors";
|
|
||||||
public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor.";
|
private static final List<ConsumerGroupPartitionAssignor> CONSUMER_GROUP_BUILTIN_ASSIGNORS = List.of(
|
||||||
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = List.of(
|
new UniformAssignor(),
|
||||||
UniformAssignor.class.getName(),
|
new RangeAssignor()
|
||||||
RangeAssignor.class.getName()
|
|
||||||
);
|
);
|
||||||
|
public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors";
|
||||||
|
public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of either names for builtin assignors or full class names for customer assignors. " +
|
||||||
|
"The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. " +
|
||||||
|
"The supported builtin assignors are: " + CONSUMER_GROUP_BUILTIN_ASSIGNORS.stream().map(ConsumerGroupPartitionAssignor::name).collect(Collectors.joining(", ")) + ".";
|
||||||
|
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = CONSUMER_GROUP_BUILTIN_ASSIGNORS
|
||||||
|
.stream()
|
||||||
|
.map(ConsumerGroupPartitionAssignor::name)
|
||||||
|
.toList();
|
||||||
|
|
||||||
public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy";
|
public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy";
|
||||||
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
|
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
|
||||||
|
@ -390,12 +403,51 @@ public class GroupCoordinatorConfig {
|
||||||
protected List<ConsumerGroupPartitionAssignor> consumerGroupAssignors(
|
protected List<ConsumerGroupPartitionAssignor> consumerGroupAssignors(
|
||||||
AbstractConfig config
|
AbstractConfig config
|
||||||
) {
|
) {
|
||||||
return Collections.unmodifiableList(
|
Map<String, ConsumerGroupPartitionAssignor> defaultAssignors = CONSUMER_GROUP_BUILTIN_ASSIGNORS
|
||||||
config.getConfiguredInstances(
|
.stream()
|
||||||
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
|
.collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
|
||||||
ConsumerGroupPartitionAssignor.class
|
|
||||||
)
|
List<ConsumerGroupPartitionAssignor> assignors = new ArrayList<>();
|
||||||
);
|
|
||||||
|
try {
|
||||||
|
for (Object object : config.getList(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG)) {
|
||||||
|
ConsumerGroupPartitionAssignor assignor;
|
||||||
|
|
||||||
|
if (object instanceof String klass) {
|
||||||
|
assignor = defaultAssignors.get(klass);
|
||||||
|
if (assignor == null) {
|
||||||
|
try {
|
||||||
|
assignor = Utils.newInstance(klass, ConsumerGroupPartitionAssignor.class);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new KafkaException("Class " + klass + " cannot be found", e);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
throw new KafkaException(klass + " is not an instance of " + ConsumerGroupPartitionAssignor.class.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (object instanceof Class<?> klass) {
|
||||||
|
Object o = Utils.newInstance((Class<?>) klass);
|
||||||
|
if (!ConsumerGroupPartitionAssignor.class.isInstance(o)) {
|
||||||
|
throw new KafkaException(klass + " is not an instance of " + ConsumerGroupPartitionAssignor.class.getName());
|
||||||
|
}
|
||||||
|
assignor = (ConsumerGroupPartitionAssignor) o;
|
||||||
|
} else {
|
||||||
|
throw new KafkaException("Unexpected element of type " + object.getClass().getName() + ", expected String or Class");
|
||||||
|
}
|
||||||
|
|
||||||
|
assignors.add(assignor);
|
||||||
|
|
||||||
|
if (assignor instanceof Configurable configurable) {
|
||||||
|
configurable.configure(config.originals());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
for (ConsumerGroupPartitionAssignor assignor : assignors) {
|
||||||
|
maybeCloseQuietly(assignor, "AutoCloseable object constructed and configured during failed call to consumerGroupAssignors");
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
return assignors;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -84,11 +84,11 @@ import java.util.Set;
|
||||||
* <p>
|
* <p>
|
||||||
*/
|
*/
|
||||||
public class RangeAssignor implements ConsumerGroupPartitionAssignor {
|
public class RangeAssignor implements ConsumerGroupPartitionAssignor {
|
||||||
public static final String RANGE_ASSIGNOR_NAME = "range";
|
public static final String NAME = "range";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String name() {
|
public String name() {
|
||||||
return RANGE_ASSIGNOR_NAME;
|
return NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -52,11 +52,11 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
|
||||||
*/
|
*/
|
||||||
public class UniformAssignor implements ConsumerGroupPartitionAssignor {
|
public class UniformAssignor implements ConsumerGroupPartitionAssignor {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
|
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
|
||||||
public static final String UNIFORM_ASSIGNOR_NAME = "uniform";
|
public static final String NAME = "uniform";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String name() {
|
public String name() {
|
||||||
return UNIFORM_ASSIGNOR_NAME;
|
return NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,45 +16,124 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.coordinator.group;
|
package org.apache.kafka.coordinator.group;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.Configurable;
|
||||||
import org.apache.kafka.common.KafkaException;
|
import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.config.AbstractConfig;
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
|
||||||
|
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
|
||||||
|
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
|
||||||
|
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
|
||||||
|
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
|
||||||
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
|
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class GroupCoordinatorConfigTest {
|
public class GroupCoordinatorConfigTest {
|
||||||
private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS = Arrays.asList(
|
private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS = List.of(
|
||||||
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
|
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
|
||||||
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
|
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
|
||||||
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
|
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
|
||||||
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
|
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
|
||||||
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF);
|
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
|
||||||
|
);
|
||||||
|
|
||||||
|
public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable {
|
||||||
|
public Map<String, ?> configs;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs) {
|
||||||
|
this.configs = configs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String name() {
|
||||||
|
return "CustomAssignor";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GroupAssignment assign(
|
||||||
|
GroupSpec groupSpec,
|
||||||
|
SubscribedTopicDescriber subscribedTopicDescriber
|
||||||
|
) throws PartitionAssignorException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumerGroupAssignorsDefault() {
|
public void testConsumerGroupAssignorFullClassNames() {
|
||||||
// The full class name of the assignors is part of our public api. Hence,
|
// The full class name of the assignors is part of our public api. Hence,
|
||||||
// we should ensure that they are not changed by mistake.
|
// we should ensure that they are not changed by mistake.
|
||||||
assertEquals(
|
assertEquals(
|
||||||
List.of(
|
"org.apache.kafka.coordinator.group.assignor.UniformAssignor",
|
||||||
"org.apache.kafka.coordinator.group.assignor.UniformAssignor",
|
UniformAssignor.class.getName()
|
||||||
"org.apache.kafka.coordinator.group.assignor.RangeAssignor"
|
|
||||||
),
|
|
||||||
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DEFAULT
|
|
||||||
);
|
);
|
||||||
|
assertEquals(
|
||||||
|
"org.apache.kafka.coordinator.group.assignor.RangeAssignor",
|
||||||
|
RangeAssignor.class.getName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumerGroupAssignors() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
GroupCoordinatorConfig config;
|
||||||
|
List<ConsumerGroupPartitionAssignor> assignors;
|
||||||
|
|
||||||
|
// Test short names.
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, "range, uniform");
|
||||||
|
config = createConfig(configs);
|
||||||
|
assignors = config.consumerGroupAssignors();
|
||||||
|
assertEquals(2, assignors.size());
|
||||||
|
assertTrue(assignors.get(0) instanceof RangeAssignor);
|
||||||
|
assertTrue(assignors.get(1) instanceof UniformAssignor);
|
||||||
|
|
||||||
|
// Test custom assignor.
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, CustomAssignor.class.getName());
|
||||||
|
config = createConfig(configs);
|
||||||
|
assignors = config.consumerGroupAssignors();
|
||||||
|
assertEquals(1, assignors.size());
|
||||||
|
assertTrue(assignors.get(0) instanceof CustomAssignor);
|
||||||
|
assertNotNull(((CustomAssignor) assignors.get(0)).configs);
|
||||||
|
|
||||||
|
// Test with classes.
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(RangeAssignor.class, CustomAssignor.class));
|
||||||
|
config = createConfig(configs);
|
||||||
|
assignors = config.consumerGroupAssignors();
|
||||||
|
assertEquals(2, assignors.size());
|
||||||
|
assertTrue(assignors.get(0) instanceof RangeAssignor);
|
||||||
|
assertTrue(assignors.get(1) instanceof CustomAssignor);
|
||||||
|
|
||||||
|
// Test combination of short name and class.
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, "uniform, " + CustomAssignor.class.getName());
|
||||||
|
config = createConfig(configs);
|
||||||
|
assignors = config.consumerGroupAssignors();
|
||||||
|
assertEquals(2, assignors.size());
|
||||||
|
assertTrue(assignors.get(0) instanceof UniformAssignor);
|
||||||
|
assertTrue(assignors.get(1) instanceof CustomAssignor);
|
||||||
|
|
||||||
|
// Test combination of short name and class.
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of("uniform", CustomAssignor.class.getName()));
|
||||||
|
config = createConfig(configs);
|
||||||
|
assignors = config.consumerGroupAssignors();
|
||||||
|
assertEquals(2, assignors.size());
|
||||||
|
assertTrue(assignors.get(0) instanceof UniformAssignor);
|
||||||
|
assertTrue(assignors.get(1) instanceof CustomAssignor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -92,7 +171,7 @@ public class GroupCoordinatorConfigTest {
|
||||||
assertEquals(200, config.consumerGroupHeartbeatIntervalMs());
|
assertEquals(200, config.consumerGroupHeartbeatIntervalMs());
|
||||||
assertEquals(55, config.consumerGroupMaxSize());
|
assertEquals(55, config.consumerGroupMaxSize());
|
||||||
assertEquals(1, config.consumerGroupAssignors().size());
|
assertEquals(1, config.consumerGroupAssignors().size());
|
||||||
assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name());
|
assertEquals(RangeAssignor.NAME, config.consumerGroupAssignors().get(0).name());
|
||||||
assertEquals(2222, config.offsetsTopicSegmentBytes());
|
assertEquals(2222, config.offsetsTopicSegmentBytes());
|
||||||
assertEquals(3333, config.offsetMetadataMaxSize());
|
assertEquals(3333, config.offsetMetadataMaxSize());
|
||||||
assertEquals(60, config.classicGroupMaxSize());
|
assertEquals(60, config.classicGroupMaxSize());
|
||||||
|
@ -168,6 +247,16 @@ public class GroupCoordinatorConfigTest {
|
||||||
assertEquals("class java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor",
|
assertEquals("class java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor",
|
||||||
assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage());
|
assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage());
|
||||||
|
|
||||||
|
configs.clear();
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Object.class.getName());
|
||||||
|
assertEquals("java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor",
|
||||||
|
assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage());
|
||||||
|
|
||||||
|
configs.clear();
|
||||||
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, "foo");
|
||||||
|
assertEquals("Class foo cannot be found",
|
||||||
|
assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage());
|
||||||
|
|
||||||
configs.clear();
|
configs.clear();
|
||||||
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, "foobar");
|
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, "foobar");
|
||||||
assertEquals("Invalid value foobar for configuration group.consumer.migration.policy: String must be one of (case insensitive): DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL",
|
assertEquals("Invalid value foobar for configuration group.consumer.migration.policy: String must be one of (case insensitive): DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL",
|
||||||
|
|
Loading…
Reference in New Issue