KAFKA-19369: Add group.share.assignors config and integration test (#19900)
CI / build (push) Waiting to run Details

* Add `group.share.assignors` config to `GroupCoordinatorConfig`.
* Send `rackId` in share group heartbeat request if it's not null.
* Add integration test `testShareConsumerWithRackAwareAssignor`.

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
 Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-06-06 21:20:56 +08:00 committed by GitHub
parent e0adec5549
commit 844b0e651b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 295 additions and 3 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
@ -36,7 +37,7 @@ import java.util.Set;
* information of the members when assigning partitions to them.
* It needs all brokers and members to have rack information available.
*/
public class RackAwareAssignor implements ConsumerGroupPartitionAssignor {
public class RackAwareAssignor implements ConsumerGroupPartitionAssignor, ShareGroupPartitionAssignor {
@Override
public String name() {
return "rack-aware-assignor";

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class ShareConsumerRackAwareTest {
@ClusterTest(
types = {Type.KRAFT},
brokers = 3,
serverProperties = {
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic, share"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor")
}
)
void testShareConsumerWithRackAwareAssignor(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
String groupId = "group0";
String topic = "test-topic";
try (Admin admin = clusterInstance.admin();
Producer<byte[], byte[]> producer = clusterInstance.producer();
ShareConsumer<byte[], byte[]> consumer0 = clusterInstance.shareConsumer(Map.of(
CommonClientConfigs.GROUP_ID_CONFIG, groupId,
CommonClientConfigs.CLIENT_ID_CONFIG, "client0",
CommonClientConfigs.CLIENT_RACK_CONFIG, "rack0"
));
ShareConsumer<byte[], byte[]> consumer1 = clusterInstance.shareConsumer(Map.of(
CommonClientConfigs.GROUP_ID_CONFIG, groupId,
CommonClientConfigs.CLIENT_ID_CONFIG, "client1",
CommonClientConfigs.CLIENT_RACK_CONFIG, "rack1"
));
ShareConsumer<byte[], byte[]> consumer2 = clusterInstance.shareConsumer(Map.of(
CommonClientConfigs.GROUP_ID_CONFIG, groupId,
CommonClientConfigs.CLIENT_ID_CONFIG, "client2",
CommonClientConfigs.CLIENT_RACK_CONFIG, "rack2"
))
) {
// Create a new topic with 1 partition on broker 0.
admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0)))));
clusterInstance.waitForTopic(topic, 1);
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes()));
producer.flush();
consumer0.subscribe(List.of(topic));
consumer1.subscribe(List.of(topic));
consumer2.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get());
ShareGroupDescription groupDescription = groups.get(groupId);
return isExpectedAssignment(groupDescription, 3, Map.of(
"client0", Set.of(new TopicPartition(topic, 0)),
"client1", Set.of(),
"client2", Set.of()
));
}, "Consumer 0 should be assigned to topic partition 0");
// Add a new partition 1 and 2 to broker 1.
admin.createPartitions(
Map.of(
topic,
NewPartitions.increaseTo(3, List.of(List.of(1), List.of(1)))
)
);
clusterInstance.waitForTopic(topic, 3);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get());
ShareGroupDescription groupDescription = groups.get(groupId);
return isExpectedAssignment(groupDescription, 3, Map.of(
"client0", Set.of(new TopicPartition(topic, 0)),
"client1", Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2)),
"client2", Set.of()
));
}, "Consumer 1 should be assigned to topic partition 1 and 2");
// Add a new partition 3, 4, and 5 to broker 2.
admin.createPartitions(
Map.of(
topic,
NewPartitions.increaseTo(6, List.of(List.of(2), List.of(2), List.of(2)))
)
);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get());
ShareGroupDescription groupDescription = groups.get(groupId);
return isExpectedAssignment(groupDescription, 3, Map.of(
"client0", Set.of(new TopicPartition(topic, 0)),
"client1", Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2)),
"client2", Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5))
));
}, "Consumer 2 should be assigned to topic partition 3, 4, and 5");
// Change partitions to different brokers.
// partition 0 -> broker 2
// partition 1 -> broker 2
// partition 2 -> broker 2
// partition 3 -> broker 1
// partition 4 -> broker 1
// partition 5 -> broker 0
admin.alterPartitionReassignments(Map.of(
new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 1), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 2), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 3), Optional.of(new NewPartitionReassignment(List.of(1))),
new TopicPartition(topic, 4), Optional.of(new NewPartitionReassignment(List.of(1))),
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
)).all().get();
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get());
ShareGroupDescription groupDescription = groups.get(groupId);
return isExpectedAssignment(groupDescription, 3, Map.of(
"client0", Set.of(new TopicPartition(topic, 5)),
"client1", Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4)),
"client2", Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))
));
}, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
}
}
boolean isExpectedAssignment(
ShareGroupDescription groupDescription,
int memberCount,
Map<String, Set<TopicPartition>> expectedAssignments
) {
return groupDescription != null &&
groupDescription.members().size() == memberCount &&
groupDescription.members().stream().allMatch(
member -> {
String clientId = member.clientId();
Set<TopicPartition> expectedPartitions = expectedAssignments.get(clientId);
return member.assignment().topicPartitions().equals(expectedPartitions);
}
);
}
}

View File

@ -342,7 +342,7 @@ public class RequestManagers implements Closeable {
ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext,
groupRebalanceConfig.groupId,
null,
groupRebalanceConfig.rackId.orElse(null),
subscriptions,
metadata,
time,

View File

@ -23,7 +23,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@ -32,6 +34,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -235,6 +238,13 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
private static final ShareGroupPartitionAssignor SHARE_GROUP_BUILTIN_ASSIGNOR = new SimpleAssignor();
public static final String SHARE_GROUP_ASSIGNORS_CONFIG = "group.share.assignors";
public static final String SHARE_GROUP_ASSIGNORS_DOC = "The server-side assignors as a list of either names for built-in assignors or full class names for custom assignors. " +
"The list must contain only a single entry which is used by all groups. The supported built-in assignors are: " +
SHARE_GROUP_BUILTIN_ASSIGNOR.name() + ".";
public static final String SHARE_GROUP_ASSIGNORS_DEFAULT = SHARE_GROUP_BUILTIN_ASSIGNOR.name();
///
/// Streams group configs
///
@ -317,6 +327,7 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
.define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, SHARE_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, SHARE_GROUP_ASSIGNORS_DOC)
// Streams group configs
.define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
@ -367,6 +378,7 @@ public class GroupCoordinatorConfig {
private final int shareGroupHeartbeatIntervalMs;
private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs;
private final List<ShareGroupPartitionAssignor> shareGroupAssignors;
// Streams group configurations
private final int streamsGroupSessionTimeoutMs;
private final int streamsGroupMinSessionTimeoutMs;
@ -415,6 +427,7 @@ public class GroupCoordinatorConfig {
this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
this.shareGroupAssignors = shareGroupAssignors(config);
// Streams group configurations
this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@ -466,6 +479,8 @@ public class GroupCoordinatorConfig {
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
String.format("%s must be less than %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupAssignors.size() == 1,
String.format("%s must contain exactly one assignor, but found %d", SHARE_GROUP_ASSIGNORS_CONFIG, shareGroupAssignors.size()));
// Streams group configs validation.
require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
@ -550,6 +565,41 @@ public class GroupCoordinatorConfig {
return assignors;
}
protected List<ShareGroupPartitionAssignor> shareGroupAssignors(
AbstractConfig config
) {
List<ShareGroupPartitionAssignor> assignors = new ArrayList<>();
try {
for (String kclass : config.getList(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG)) {
ShareGroupPartitionAssignor assignor = SHARE_GROUP_BUILTIN_ASSIGNOR;
if (!Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) {
try {
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + kclass + " cannot be found", e);
} catch (ClassCastException e) {
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName());
}
}
assignors.add(assignor);
if (assignor instanceof Configurable configurable) {
configurable.configure(config.originals());
}
}
} catch (Exception e) {
for (ShareGroupPartitionAssignor assignor : assignors) {
maybeCloseQuietly(assignor, "AutoCloseable object constructed and configured during failed call to shareGroupAssignors");
}
throw e;
}
return assignors;
}
/**
* Copy the subset of properties that are relevant to consumer group and share group.
*/
@ -809,6 +859,13 @@ public class GroupCoordinatorConfig {
return shareGroupMaxHeartbeatIntervalMs;
}
/**
* The share group assignors.
*/
public List<ShareGroupPartitionAssignor> shareGroupAssignors() {
return shareGroupAssignors;
}
/**
* The streams group session timeout in milliseconds.
*/

View File

@ -268,6 +268,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.withConfig(config)
.withGroupConfigManager(groupConfigManager)
.withGroupCoordinatorMetricsShard(metricsShard)
.withShareGroupAssignor(config.shareGroupAssignors().get(0))
.withAuthorizerPlugin(authorizerPlugin)
.build();

View File

@ -25,8 +25,10 @@ import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAss
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.junit.jupiter.api.Test;
@ -37,13 +39,14 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorConfigTest {
public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable {
public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable, ShareGroupPartitionAssignor {
public Map<String, ?> configs;
@Override
@ -126,6 +129,48 @@ public class GroupCoordinatorConfigTest {
assertTrue(assignors.get(1) instanceof CustomAssignor);
}
@Test
public void testShareGroupAssignorFullClassNames() {
// The full class name of the assignors is part of our public api. Hence,
// we should ensure that they are not changed by mistake.
assertEquals(
"org.apache.kafka.coordinator.group.assignor.SimpleAssignor",
SimpleAssignor.class.getName()
);
}
@Test
public void testShareGroupAssignors() {
Map<String, Object> configs = new HashMap<>();
GroupCoordinatorConfig config;
List<ShareGroupPartitionAssignor> assignors;
// Test default config.
config = createConfig(configs);
assignors = config.shareGroupAssignors();
assertEquals(1, assignors.size());
// Test short names.
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, "simple");
config = createConfig(configs);
assignors = config.shareGroupAssignors();
assertEquals(1, assignors.size());
assertInstanceOf(SimpleAssignor.class, assignors.get(0));
// Test custom assignor.
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, CustomAssignor.class.getName());
config = createConfig(configs);
assignors = config.shareGroupAssignors();
assertEquals(1, assignors.size());
assertInstanceOf(CustomAssignor.class, assignors.get(0));
assertNotNull(((CustomAssignor) assignors.get(0)).configs);
// Test must contain only one assignor.
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, "simple, " + CustomAssignor.class.getName());
assertEquals("group.share.assignors must contain exactly one assignor, but found 2",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
}
@Test
public void testConfigs() {
Map<String, Object> configs = new HashMap<>();