KAFKA-18736: Do not send fields if not needed (#19181)

The Streams heartbeat request has some fields that are always sent.
Those are:
- group ID
- member ID
- member epoch
- group instance ID (if static membership is used)

Then it has fields that are only sent when joining:
- topology and topology epoch
- rebalance timeout
- process ID
- endpoint
- client tags

Finally, the assignment is only sent if it changed compared to the last
sent request.

Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Bruno Cadonna 2025-03-16 18:08:56 +01:00 committed by GitHub
parent f7d07d62d9
commit a7e40b7c5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 536 additions and 131 deletions

View File

@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
static class HeartbeatState { static class HeartbeatState {
// Fields of StreamsGroupHeartbeatRequest sent in the most recent request
static class LastSentFields {
private StreamsRebalanceData.Assignment assignment = StreamsRebalanceData.Assignment.EMPTY;
LastSentFields() {
}
void reset() {
assignment = StreamsRebalanceData.Assignment.EMPTY;
}
}
private final StreamsMembershipManager membershipManager; private final StreamsMembershipManager membershipManager;
private final int rebalanceTimeoutMs; private final int rebalanceTimeoutMs;
private final StreamsRebalanceData streamsRebalanceData; private final StreamsRebalanceData streamsRebalanceData;
private final LastSentFields lastSentFields = new LastSentFields();
public HeartbeatState(final StreamsRebalanceData streamsRebalanceData, public HeartbeatState(final StreamsRebalanceData streamsRebalanceData,
final StreamsMembershipManager membershipManager, final StreamsMembershipManager membershipManager,
@ -74,6 +88,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
} }
public void reset() { public void reset() {
lastSentFields.reset();
} }
public StreamsGroupHeartbeatRequestData buildRequestData() { public StreamsGroupHeartbeatRequestData buildRequestData() {
@ -82,33 +97,45 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
data.setMemberId(membershipManager.memberId()); data.setMemberId(membershipManager.memberId());
data.setMemberEpoch(membershipManager.memberEpoch()); data.setMemberEpoch(membershipManager.memberEpoch());
membershipManager.groupInstanceId().ifPresent(data::setInstanceId); membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology();
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); boolean joining = membershipManager.state() == MemberState.JOINING;
topology.setEpoch(streamsRebalanceData.topologyEpoch());
data.setRebalanceTimeoutMs(rebalanceTimeoutMs); if (joining) {
data.setTopology(topology); StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology();
data.setProcessId(streamsRebalanceData.processId().toString()); topology.setSubtopologies(fromStreamsToHeartbeatRequest(streamsRebalanceData.subtopologies()));
streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { topology.setEpoch(streamsRebalanceData.topologyEpoch());
data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() data.setTopology(topology);
.setHost(userEndpoint.host()) data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
.setPort(userEndpoint.port()) data.setProcessId(streamsRebalanceData.processId().toString());
); streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
}); data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint()
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() .setHost(userEndpoint.host())
.map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() .setPort(userEndpoint.port())
.setKey(entry.getKey()) );
.setValue(entry.getValue()) });
) data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
.collect(Collectors.toList())); .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
.setKey(entry.getKey())
.setValue(entry.getValue())
)
.collect(Collectors.toList()));
data.setActiveTasks(fromStreamsToHeartbeatRequest(Set.of()));
data.setStandbyTasks(fromStreamsToHeartbeatRequest(Set.of()));
data.setWarmupTasks(fromStreamsToHeartbeatRequest(Set.of()));
} else {
StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment();
if (!reconciledAssignment.equals(lastSentFields.assignment)) {
data.setActiveTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.activeTasks()));
data.setStandbyTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.standbyTasks()));
data.setWarmupTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.warmupTasks()));
lastSentFields.assignment = reconciledAssignment;
}
}
data.setShutdownApplication(streamsRebalanceData.shutdownRequested()); data.setShutdownApplication(streamsRebalanceData.shutdownRequested());
StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment();
data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks()));
data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks()));
data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks()));
return data; return data;
} }
private static List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdCollection(final Set<StreamsRebalanceData.TaskId> tasks) { private static List<StreamsGroupHeartbeatRequestData.TaskIds> fromStreamsToHeartbeatRequest(final Set<StreamsRebalanceData.TaskId> tasks) {
return tasks.stream() return tasks.stream()
.collect( .collect(
Collectors.groupingBy(StreamsRebalanceData.TaskId::subtopologyId, Collectors.groupingBy(StreamsRebalanceData.TaskId::subtopologyId,
@ -117,27 +144,26 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
.entrySet() .entrySet()
.stream() .stream()
.map(entry -> { .map(entry -> {
StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds(); return new StreamsGroupHeartbeatRequestData.TaskIds()
ids.setSubtopologyId(entry.getKey()); .setSubtopologyId(entry.getKey())
ids.setPartitions(entry.getValue()); .setPartitions(entry.getValue());
return ids;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static List<StreamsGroupHeartbeatRequestData.Subtopology> getTopologyFromStreams(final Map<String, StreamsRebalanceData.Subtopology> subtopologies) { private static List<StreamsGroupHeartbeatRequestData.Subtopology> fromStreamsToHeartbeatRequest(final Map<String, StreamsRebalanceData.Subtopology> subtopologies) {
final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologiesForRequest = new ArrayList<>(subtopologies.size()); final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologiesForRequest = new ArrayList<>(subtopologies.size());
for (final Map.Entry<String, StreamsRebalanceData.Subtopology> subtopology : subtopologies.entrySet()) { for (final Map.Entry<String, StreamsRebalanceData.Subtopology> subtopology : subtopologies.entrySet()) {
subtopologiesForRequest.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue())); subtopologiesForRequest.add(fromStreamsToHeartbeatRequest(subtopology.getKey(), subtopology.getValue()));
} }
subtopologiesForRequest.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.Subtopology::subtopologyId)); subtopologiesForRequest.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.Subtopology::subtopologyId));
return subtopologiesForRequest; return subtopologiesForRequest;
} }
private static StreamsGroupHeartbeatRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, private static StreamsGroupHeartbeatRequestData.Subtopology fromStreamsToHeartbeatRequest(final String subtopologyId,
final StreamsRebalanceData.Subtopology subtopology) { final StreamsRebalanceData.Subtopology subtopology) {
final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData = new StreamsGroupHeartbeatRequestData.Subtopology(); final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData = new StreamsGroupHeartbeatRequestData.Subtopology();
subtopologyData.setSubtopologyId(subtopologyName); subtopologyData.setSubtopologyId(subtopologyId);
ArrayList<String> sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics()); ArrayList<String> sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics());
Collections.sort(sortedSourceTopics); Collections.sort(sortedSourceTopics);
subtopologyData.setSourceTopics(sortedSourceTopics); subtopologyData.setSourceTopics(sortedSourceTopics);

View File

@ -38,25 +38,30 @@ import org.apache.kafka.common.utils.Timer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockedConstruction; import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -116,8 +121,21 @@ class StreamsGroupHeartbeatRequestManagerTest {
CHANGELOG_TOPICS, CHANGELOG_TOPICS,
COPARTITION_GROUP COPARTITION_GROUP
); );
private static final String SUBTOPOLOGY_NAME_2 = "subtopology2";
private static final String SOURCE_TOPIC_3 = "sourceTopic3";
private static final String CHANGELOG_TOPIC_4 = "changelogTopic4";
private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_2 = new StreamsRebalanceData.Subtopology(
Set.of(SOURCE_TOPIC_3),
Set.of(),
Map.of(),
Map.of(CHANGELOG_TOPIC_4, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of())),
Collections.emptyList()
);
private static final Map<String, StreamsRebalanceData.Subtopology> SUBTOPOLOGIES = private static final Map<String, StreamsRebalanceData.Subtopology> SUBTOPOLOGIES =
Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1); Map.of(
SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1,
SUBTOPOLOGY_NAME_2, SUBTOPOLOGY_2
);
private static final String CLIENT_TAG_1 = "client-tag1"; private static final String CLIENT_TAG_1 = "client-tag1";
private static final String VALUE_1 = "value1"; private static final String VALUE_1 = "value1";
private static final Map<String, String> CLIENT_TAGS = Map.of(CLIENT_TAG_1, VALUE_1); private static final Map<String, String> CLIENT_TAGS = Map.of(CLIENT_TAG_1, VALUE_1);
@ -475,101 +493,6 @@ class StreamsGroupHeartbeatRequestManagerTest {
} }
} }
@Test
public void testSendingFullHeartbeatRequest() {
try (
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.timeUntilNextPollMs);
assertEquals(1, result.unsentRequests.size());
assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node());
NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
assertEquals(GROUP_ID, streamsRequest.data().groupId());
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId());
assertEquals(ENDPOINT.host(), streamsRequest.data().userEndpoint().host());
assertEquals(ENDPOINT.port(), streamsRequest.data().userEndpoint().port());
assertEquals(1, streamsRequest.data().clientTags().size());
assertEquals(CLIENT_TAG_1, streamsRequest.data().clientTags().get(0).key());
assertEquals(VALUE_1, streamsRequest.data().clientTags().get(0).value());
assertEquals(streamsRebalanceData.topologyEpoch(), streamsRequest.data().topology().epoch());
assertNotNull(streamsRequest.data().topology());
final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = streamsRequest.data().topology().subtopologies();
assertEquals(1, subtopologies.size());
final StreamsGroupHeartbeatRequestData.Subtopology subtopology = subtopologies.get(0);
assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), subtopology.sourceTopics());
assertEquals(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2", "repartitionSinkTopic3"), subtopology.repartitionSinkTopics());
assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology.repartitionSourceTopics().size());
subtopology.repartitionSourceTopics().forEach(topicInfo -> {
final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions());
assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
});
assertEquals(CHANGELOG_TOPICS.size(), subtopology.stateChangelogTopics().size());
subtopology.stateChangelogTopics().forEach(topicInfo -> {
assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
assertEquals(0, topicInfo.partitions());
final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
});
assertEquals(2, subtopology.copartitionGroups().size());
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 =
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
.setSourceTopics(Collections.singletonList((short) 1));
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 =
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(Collections.singletonList((short) 1))
.setSourceTopics(Collections.singletonList((short) 0));
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
verify(membershipManager).onHeartbeatRequestGenerated();
time.sleep(2000);
assertEquals(
2.0,
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue()
);
final ClientResponse response = buildClientResponse();
networkRequest.future().complete(response);
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody());
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
verify(heartbeatRequestState).resetTimer();
final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost()
.get(new StreamsRebalanceData.HostInfo(
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
);
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic());
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition());
assertEquals(
1.0,
metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue()
);
}
}
@Test @Test
public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
try ( try (
@ -582,7 +505,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
Timer.class, Timer.class,
(mock, context) -> { (mock, context) -> {
when(mock.isExpired()).thenReturn(true); when(mock.isExpired()).thenReturn(true);
}); })
) { ) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
@ -613,6 +536,445 @@ class StreamsGroupHeartbeatRequestManagerTest {
} }
} }
@Test
public void testSendingHeartbeatRequest() {
try (
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.timeUntilNextPollMs);
assertEquals(1, result.unsentRequests.size());
assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node());
NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
assertEquals(GROUP_ID, streamsRequest.data().groupId());
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
verify(membershipManager).onHeartbeatRequestGenerated();
time.sleep(2000);
assertEquals(
2.0,
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue()
);
final ClientResponse response = buildClientResponse();
networkRequest.future().complete(response);
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody());
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
verify(heartbeatRequestState).resetTimer();
final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost()
.get(new StreamsRebalanceData.HostInfo(
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
);
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic());
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition());
assertEquals(
1.0,
metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue()
);
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) {
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty());
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1000
);
StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
assertEquals(GROUP_ID, requestData1.groupId());
assertEquals(MEMBER_ID, requestData1.memberId());
assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
if (instanceIdPresent) {
assertEquals(INSTANCE_ID, requestData1.instanceId());
} else {
assertNull(requestData1.instanceId());
}
StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData();
assertEquals(GROUP_ID, requestData2.groupId());
assertEquals(MEMBER_ID, requestData2.memberId());
assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
if (instanceIdPresent) {
assertEquals(INSTANCE_ID, requestData2.instanceId());
} else {
assertNull(requestData2.instanceId());
}
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1000
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch());
final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = requestData1.topology().subtopologies();
assertEquals(2, subtopologies.size());
final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0);
assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics());
assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics());
assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size());
subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions());
assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
});
assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size());
subtopology1.stateChangelogTopics().forEach(topicInfo -> {
assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
assertEquals(0, topicInfo.partitions());
final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor());
assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
});
assertEquals(2, subtopology1.copartitionGroups().size());
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 =
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
.setSourceTopics(Collections.singletonList((short) 1));
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 =
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(Collections.singletonList((short) 1))
.setSourceTopics(Collections.singletonList((short) 0));
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1);
assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics());
assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics());
assertEquals(1, subtopology2.stateChangelogTopics().size());
assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name());
assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions());
assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor());
assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
assertNull(nonJoiningRequestData.topology());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) {
final int rebalanceTimeoutMs = 1234;
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
rebalanceTimeoutMs
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
assertEquals(PROCESS_ID.toString(), requestData1.processId());
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
assertNull(nonJoiningRequestData.processId());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host());
assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port());
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
assertNull(nonJoiningRequestData.userEndpoint());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key());
assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
assertNull(nonJoiningRequestData.clientTags());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
assertEquals(List.of(), joiningRequestData.activeTasks());
assertEquals(List.of(), joiningRequestData.standbyTasks());
assertEquals(List.of(), joiningRequestData.warmupTasks());
when(membershipManager.state()).thenReturn(memberState);
streamsRebalanceData.setReconciledAssignment(
new StreamsRebalanceData.Assignment(
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
),
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
),
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
)
)
);
StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData();
assertTaskIdsEquals(
List.of(
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_1)
.setPartitions(List.of(0, 1)),
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_2)
.setPartitions(List.of(2))
),
firstNonJoiningRequestData.activeTasks()
);
assertTaskIdsEquals(
List.of(
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_1)
.setPartitions(List.of(2))
),
firstNonJoiningRequestData.standbyTasks()
);
assertTaskIdsEquals(
List.of(
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_1)
.setPartitions(List.of(3, 4, 5))
),
firstNonJoiningRequestData.warmupTasks()
);
StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData();
assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
streamsRebalanceData.setReconciledAssignment(
new StreamsRebalanceData.Assignment(
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
),
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
),
Set.of(
)
)
);
StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData();
assertTaskIdsEquals(
List.of(
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_1)
.setPartitions(List.of(0))
),
nonJoiningRequestDataWithChanges.activeTasks()
);
assertTaskIdsEquals(
List.of(
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_NAME_1)
.setPartitions(List.of(2))
),
nonJoiningRequestDataWithChanges.standbyTasks()
);
assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks());
}
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testResettingHeartbeatState(final MemberState memberState) {
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(memberState);
streamsRebalanceData.setReconciledAssignment(
new StreamsRebalanceData.Assignment(
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
),
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
),
Set.of(
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
)
)
);
StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData();
assertEquals(GROUP_ID, requestDataBeforeReset.groupId());
assertEquals(MEMBER_ID, requestDataBeforeReset.memberId());
assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch());
assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId());
assertFalse(requestDataBeforeReset.activeTasks().isEmpty());
assertFalse(requestDataBeforeReset.standbyTasks().isEmpty());
assertFalse(requestDataBeforeReset.warmupTasks().isEmpty());
heartbeatState.reset();
StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData();
assertEquals(GROUP_ID, requestDataAfterReset.groupId());
assertEquals(MEMBER_ID, requestDataAfterReset.memberId());
assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch());
assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId());
assertEquals(requestDataBeforeReset.activeTasks(), requestDataAfterReset.activeTasks());
assertEquals(requestDataBeforeReset.standbyTasks(), requestDataAfterReset.standbyTasks());
assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks());
}
private static Stream<Arguments> provideNonJoiningStates() {
return Stream.of(
Arguments.of(MemberState.ACKNOWLEDGING),
Arguments.of(MemberState.RECONCILING),
Arguments.of(MemberState.STABLE),
Arguments.of(MemberState.PREPARE_LEAVING),
Arguments.of(MemberState.LEAVING)
);
}
@ParameterizedTest
@EnumSource(
value = MemberState.class,
names = {"JOINING", "ACKNOWLEDGING", "RECONCILING", "STABLE", "PREPARE_LEAVING", "LEAVING"}
)
public void testBuildingHeartbeatShutdownRequested(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(
streamsRebalanceData,
membershipManager,
1234
);
when(membershipManager.state()).thenReturn(memberState);
StreamsGroupHeartbeatRequestData requestDataWithoutShutdownRequest = heartbeatState.buildRequestData();
assertFalse(requestDataWithoutShutdownRequest.shutdownApplication());
streamsRebalanceData.requestShutdown();
StreamsGroupHeartbeatRequestData requestDataWithShutdownRequest = heartbeatState.buildRequestData();
assertTrue(requestDataWithShutdownRequest.shutdownApplication());
}
private static ConsumerConfig config() { private static ConsumerConfig config() {
Properties prop = new Properties(); Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@ -651,4 +1013,21 @@ class StreamsGroupHeartbeatRequestManagerTest {
) )
); );
} }
private static void assertTaskIdsEquals(final List<StreamsGroupHeartbeatRequestData.TaskIds> expected,
final List<StreamsGroupHeartbeatRequestData.TaskIds> actual) {
List<StreamsGroupHeartbeatRequestData.TaskIds> sortedExpected = expected.stream()
.map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(taskIds.subtopologyId())
.setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList())))
.sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId))
.collect(Collectors.toList());
List<StreamsGroupHeartbeatRequestData.TaskIds> sortedActual = actual.stream()
.map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(taskIds.subtopologyId())
.setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList())))
.sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId))
.collect(Collectors.toList());
assertEquals(sortedExpected, sortedActual);
}
} }