KAFKA-17747: [7/N] Add consumer group integration test for rack aware assignment (#19856)

* Add `RackAwareAssignor`. It uses `racksForPartition` to check the rack
id of a partition and assign it to a member which has the same rack id.
* Add `ConsumerIntegrationTest#testRackAwareAssignment` to check
`racksForPartition` works correctly.

Reviewers: David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-06-05 01:32:17 +08:00 committed by GitHub
parent 70b672b808
commit 949617b0b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 368 additions and 21 deletions

View File

@ -1995,6 +1995,7 @@ project(':clients:clients-integration-tests') {
implementation project(':server-common')
testImplementation project(':metadata')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':transaction-coordinator')
testImplementation libs.junitJupiter

View File

@ -235,6 +235,7 @@
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<allow class="org.apache.logging.log4j.Level" />

View File

@ -16,7 +16,12 @@
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
@ -31,12 +36,16 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -216,6 +225,116 @@ public class ConsumerIntegrationTest {
}
}
@ClusterTest(
types = {Type.KRAFT},
brokers = 3,
serverProperties = {
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor")
}
)
public void testRackAwareAssignment(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
String topic = "test-topic";
try (Admin admin = clusterInstance.admin();
Producer<byte[], byte[]> producer = clusterInstance.producer();
Consumer<byte[], byte[]> consumer0 = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, "group0",
ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
));
Consumer<byte[], byte[]> consumer1 = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, "group0",
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
));
Consumer<byte[], byte[]> consumer2 = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_ID_CONFIG, "group0",
ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
))
) {
// Create a new topic with 1 partition on broker 0.
admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0)))));
clusterInstance.waitForTopic(topic, 1);
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes()));
producer.flush();
consumer0.subscribe(List.of(topic));
consumer1.subscribe(List.of(topic));
consumer2.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().isEmpty() &&
consumer2.assignment().isEmpty();
}, "Consumer 0 should be assigned to topic partition 0");
// Add a new partition 1 and 2 to broker 1.
admin.createPartitions(
Map.of(
topic,
NewPartitions.increaseTo(3, List.of(List.of(1), List.of(1)))
)
);
clusterInstance.waitForTopic(topic, 3);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().isEmpty();
}, "Consumer 1 should be assigned to topic partition 1 and 2");
// Add a new partition 3, 4, and 5 to broker 2.
admin.createPartitions(
Map.of(
topic,
NewPartitions.increaseTo(6, List.of(List.of(2), List.of(2), List.of(2)))
)
);
clusterInstance.waitForTopic(topic, 6);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
}, "Consumer 2 should be assigned to topic partition 3, 4, and 5");
// Change partitions to different brokers.
// partition 0 -> broker 2
// partition 1 -> broker 2
// partition 2 -> broker 2
// partition 3 -> broker 1
// partition 4 -> broker 1
// partition 5 -> broker 0
admin.alterPartitionReassignments(Map.of(
new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 1), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 2), Optional.of(new NewPartitionReassignment(List.of(2))),
new TopicPartition(topic, 3), Optional.of(new NewPartitionReassignment(List.of(1))),
new TopicPartition(topic, 4), Optional.of(new NewPartitionReassignment(List.of(1))),
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
)).all().get();
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
}, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
}
}
private void sendMsg(ClusterInstance clusterInstance, String topic, int sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The RackAwareAssignor is a consumer group partition assignor that takes into account the rack
* information of the members when assigning partitions to them.
* It needs all brokers and members to have rack information available.
*/
public class RackAwareAssignor implements ConsumerGroupPartitionAssignor {
@Override
public String name() {
return "rack-aware-assignor";
}
@Override
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
Map<String, String> rackIdToMemberId = new HashMap<>();
List<String> memberIds = new ArrayList<>(groupSpec.memberIds());
for (String memberId : memberIds) {
if (groupSpec.memberSubscription(memberId).rackId().isEmpty()) {
throw new PartitionAssignorException("Member " + memberId + " does not have rack information available.");
}
rackIdToMemberId.put(
groupSpec.memberSubscription(memberId).rackId().get(),
memberId
);
}
Map<String, Map<Uuid, Set<Integer>>> assignments = new HashMap<>();
for (Uuid topicId : groupSpec.memberSubscription(memberIds.get(0)).subscribedTopicIds()) {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) {
throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
}
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
Set<String> racks = subscribedTopicDescriber.racksForPartition(topicId, partitionId);
if (racks.isEmpty()) {
throw new PartitionAssignorException("No racks available for partition " + partitionId + " of topic " + topicId);
}
String assignedRack = null;
for (String rack : racks) {
String memberId = rackIdToMemberId.get(rack);
if (memberId == null) {
continue;
}
assignedRack = rack;
break;
}
if (assignedRack == null) {
throw new PartitionAssignorException("No member found for racks " + racks + " for partition " + partitionId + " of topic " + topicId);
}
Map<Uuid, Set<Integer>> assignment = assignments.computeIfAbsent(
rackIdToMemberId.get(assignedRack),
k -> new HashMap<>()
);
Set<Integer> partitions = assignment.computeIfAbsent(
topicId,
k -> new java.util.HashSet<>()
);
partitions.add(partitionId);
}
}
Map<String, MemberAssignment> memberAssignments = new HashMap<>();
for (Map.Entry<String, Map<Uuid, Set<Integer>>> entry : assignments.entrySet()) {
memberAssignments.put(entry.getKey(), new MemberAssignmentImpl(entry.getValue()));
}
return new GroupAssignment(memberAssignments);
}
}

View File

@ -43,6 +43,7 @@ public class GroupRebalanceConfig {
public final int heartbeatIntervalMs;
public final String groupId;
public final Optional<String> groupInstanceId;
public final Optional<String> rackId;
public final long retryBackoffMs;
public final long retryBackoffMaxMs;
public final boolean leaveGroupOnClose;
@ -53,8 +54,12 @@ public class GroupRebalanceConfig {
// Consumer and Connect use different config names for defining rebalance timeout
if ((protocolType == ProtocolType.CONSUMER) || (protocolType == ProtocolType.SHARE)) {
this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
String rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG);
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
} else {
this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG);
this.rackId = Optional.empty();
}
this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG);
@ -90,6 +95,7 @@ public class GroupRebalanceConfig {
final int heartbeatIntervalMs,
String groupId,
Optional<String> groupInstanceId,
String rackId,
long retryBackoffMs,
long retryBackoffMaxMs,
boolean leaveGroupOnClose) {
@ -98,6 +104,7 @@ public class GroupRebalanceConfig {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.groupId = groupId;
this.groupInstanceId = groupInstanceId;
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.leaveGroupOnClose = leaveGroupOnClose;

View File

@ -230,7 +230,6 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
clientTelemetryReporter);
}
this.fetcher = new Fetcher<>(
@ -330,6 +329,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
heartbeatIntervalMs,
groupId.get(),
groupInstanceId,
rackId,
retryBackoffMs,
retryBackoffMaxMs,
true
@ -348,7 +348,6 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
autoCommitIntervalMs,
interceptors,
throwOnStableOffsetNotSupported,
rackId,
clientTelemetryReporter
);
} else {

View File

@ -178,7 +178,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
String rackId,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this(rebalanceConfig,
logContext,
@ -193,7 +192,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
autoCommitIntervalMs,
interceptors,
throwOnFetchStableOffsetsUnsupported,
rackId,
clientTelemetryReporter,
Optional.empty());
}
@ -214,7 +212,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
String rackId,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
super(rebalanceConfig,
@ -228,7 +225,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
this.rackId = rebalanceConfig.rackId;
this.metadataSnapshot = new MetadataSnapshot(this.rackId, subscriptions, metadata.fetch(), metadata.updateVersion());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();

View File

@ -247,6 +247,7 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
sentFields.reset();
}
@SuppressWarnings("NPathComplexity")
public ConsumerGroupHeartbeatRequestData buildRequestData() {
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData();
@ -306,6 +307,12 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
sentFields.localAssignment = local;
}
// RackId - sent when joining
String rackId = membershipManager.rackId().orElse(null);
if (sendAllFields) {
data.setRackId(rackId);
}
return data;
}

View File

@ -112,6 +112,8 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
*/
protected final Optional<String> groupInstanceId;
private final Optional<String> rackId;
/**
* Rebalance timeout. To be used as time limit for the commit request issued
* when a new assignment is received, that is retried until it succeeds, fails with a
@ -140,6 +142,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
public ConsumerMembershipManager(String groupId,
Optional<String> groupInstanceId,
Optional<String> rackId,
int rebalanceTimeoutMs,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
@ -152,6 +155,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
boolean autoCommitEnabled) {
this(groupId,
groupInstanceId,
rackId,
rebalanceTimeoutMs,
serverAssignor,
subscriptions,
@ -167,6 +171,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
// Visible for testing
ConsumerMembershipManager(String groupId,
Optional<String> groupInstanceId,
Optional<String> rackId,
int rebalanceTimeoutMs,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
@ -185,6 +190,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
metricsManager,
autoCommitEnabled);
this.groupInstanceId = groupInstanceId;
this.rackId = rackId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.serverAssignor = serverAssignor;
this.commitRequestManager = commitRequestManager;
@ -199,6 +205,10 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
return groupInstanceId;
}
public Optional<String> rackId() {
return rackId;
}
/**
* {@inheritDoc}
*/

View File

@ -248,6 +248,7 @@ public class RequestManagers implements Closeable {
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
groupRebalanceConfig.rackId,
groupRebalanceConfig.rebalanceTimeoutMs,
serverAssignor,
subscriptions,

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupRebalanceConfigTest {
@ParameterizedTest
@EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = {"CONSUMER", "SHARE"})
void testRackIdIsEmptyIfNoDefined(GroupRebalanceConfig.ProtocolType protocolType) {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
new ConsumerConfig(Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"
)),
protocolType
);
assertTrue(groupRebalanceConfig.rackId.isEmpty());
}
@ParameterizedTest
@EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = {"CONSUMER", "SHARE"})
void testRackIdIsNotEmptyIfDefined(GroupRebalanceConfig.ProtocolType protocolType) {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
new ConsumerConfig(Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1"
)),
protocolType
);
assertTrue(groupRebalanceConfig.rackId.isPresent());
assertEquals("rack1", groupRebalanceConfig.rackId.get());
}
}

View File

@ -166,6 +166,7 @@ public class AbstractCoordinatorTest {
HEARTBEAT_INTERVAL_MS,
GROUP_ID,
groupInstanceId,
null,
retryBackoffMs,
retryBackoffMaxMs,
leaveOnClose);

View File

@ -208,7 +208,7 @@ public abstract class ConsumerCoordinatorTest {
this.rebalanceListener = new MockRebalanceListener();
this.mockOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
this.rebalanceConfig = buildRebalanceConfig(Optional.empty(), null);
this.coordinator = buildCoordinator(rebalanceConfig,
metrics,
assignors,
@ -216,12 +216,13 @@ public abstract class ConsumerCoordinatorTest {
subscriptions);
}
private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) {
private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId, String rackId) {
return new GroupRebalanceConfig(sessionTimeoutMs,
rebalanceTimeoutMs,
heartbeatIntervalMs,
groupId,
groupInstanceId,
rackId,
retryBackoffMs,
retryBackoffMaxMs,
groupInstanceId.isEmpty());
@ -2974,7 +2975,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
rebalanceConfig = buildRebalanceConfig(groupInstanceId);
rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
ConsumerCoordinator coordinator = buildCoordinator(
rebalanceConfig,
new Metrics(),
@ -3699,7 +3700,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
true,
null,
Optional.empty());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@ -3750,7 +3750,7 @@ public abstract class ConsumerCoordinatorTest {
final boolean autoCommit,
final Optional<String> groupInstanceId,
final boolean shouldPoll) {
rebalanceConfig = buildRebalanceConfig(groupInstanceId);
rebalanceConfig = buildRebalanceConfig(groupInstanceId, null);
ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
new Metrics(),
assignors,
@ -3868,7 +3868,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
null,
Optional.empty());
}
@ -4112,9 +4111,10 @@ public abstract class ConsumerCoordinatorTest {
metrics = new Metrics(time);
rebalanceConfig = buildRebalanceConfig(rebalanceConfig.groupInstanceId, rackId);
coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient,
Collections.singletonList(assignor), metadata, subscriptions,
metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty());
metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, Optional.empty());
}
private static MetadataResponse rackAwareMetadata(int numNodes,
@ -4193,7 +4193,6 @@ public abstract class ConsumerCoordinatorTest {
autoCommitIntervalMs,
null,
false,
null,
Optional.empty(),
Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)));
}

View File

@ -1009,6 +1009,34 @@ public class ConsumerHeartbeatRequestManagerTest {
assertNull(data.subscribedTopicRegex());
}
@Test
public void testRackIdInHeartbeatLifecycle() {
heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
createHeartbeatRequestStateWithZeroHeartbeatInterval();
// Initial heartbeat with rackId
mockJoiningMemberData(null);
when(membershipManager.rackId()).thenReturn(Optional.of("rack1"));
ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
assertEquals("rack1", data.rackId());
// RackId not included in HB if member state is not JOINING
when(membershipManager.state()).thenReturn(MemberState.STABLE);
data = heartbeatState.buildRequestData();
assertNull(data.rackId());
// RackId included in HB if member state changes to JOINING again
when(membershipManager.state()).thenReturn(MemberState.JOINING);
data = heartbeatState.buildRequestData();
assertEquals("rack1", data.rackId());
// Empty rackId not included in HB
when(membershipManager.rackId()).thenReturn(Optional.empty());
heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
data = heartbeatState.buildRequestData();
assertNull(data.rackId());
}
private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs) {
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());

View File

@ -142,17 +142,20 @@ public class ConsumerMembershipManagerTest {
private ConsumerMembershipManager createMembershipManager(String groupInstanceId) {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
return manager;
}
private ConsumerMembershipManager createMembershipManagerJoiningGroup(String groupInstanceId,
String serverAssignor) {
private ConsumerMembershipManager createMembershipManagerJoiningGroup(
String groupInstanceId,
String serverAssignor,
String rackId
) {
ConsumerMembershipManager manager = spy(new ConsumerMembershipManager(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(rackId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true));
assertMemberIdIsGenerated(manager.memberId());
@ -165,10 +168,19 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
assertEquals(Optional.empty(), membershipManager.serverAssignor());
membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform");
membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform", null);
assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor());
}
@Test
public void testMembershipManagerRackId() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
assertEquals(Optional.empty(), membershipManager.rackId());
membershipManager = createMembershipManagerJoiningGroup(null, null, "rack1");
assertEquals(Optional.of("rack1"), membershipManager.rackId());
}
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
createMembershipManagerJoiningGroup();
@ -231,7 +243,7 @@ public class ConsumerMembershipManagerTest {
@Test
public void testTransitionToFailedWhenTryingToJoin() {
ConsumerMembershipManager membershipManager = new ConsumerMembershipManager(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
GROUP_ID, Optional.empty(), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, LOG_CONTEXT,
backgroundEventHandler, time, rebalanceMetricsManager, true);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
@ -2737,7 +2749,7 @@ public class ConsumerMembershipManagerTest {
}
private ConsumerMembershipManager createMemberInStableState(String groupInstanceId) {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null);
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null, null);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());

View File

@ -45,6 +45,7 @@ public class HeartbeatTest {
heartbeatIntervalMs,
"group_id",
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);

View File

@ -149,6 +149,7 @@ public class WorkerCoordinatorIncrementalTest {
heartbeatIntervalMs,
groupId,
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);

View File

@ -139,6 +139,7 @@ public class WorkerCoordinatorTest {
heartbeatIntervalMs,
groupId,
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);