diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 0f17353492e..2aa466e7f60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { static class HeartbeatState { + // Fields of StreamsGroupHeartbeatRequest sent in the most recent request + static class LastSentFields { + + private StreamsRebalanceData.Assignment assignment = StreamsRebalanceData.Assignment.EMPTY; + + LastSentFields() { + } + + void reset() { + assignment = StreamsRebalanceData.Assignment.EMPTY; + } + } + private final StreamsMembershipManager membershipManager; private final int rebalanceTimeoutMs; private final StreamsRebalanceData streamsRebalanceData; + private final LastSentFields lastSentFields = new LastSentFields(); public HeartbeatState(final StreamsRebalanceData streamsRebalanceData, final StreamsMembershipManager membershipManager, @@ -74,6 +88,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { } public void reset() { + lastSentFields.reset(); } public StreamsGroupHeartbeatRequestData buildRequestData() { @@ -82,33 +97,45 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { data.setMemberId(membershipManager.memberId()); data.setMemberEpoch(membershipManager.memberEpoch()); membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); - topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); - topology.setEpoch(streamsRebalanceData.topologyEpoch()); - data.setRebalanceTimeoutMs(rebalanceTimeoutMs); - data.setTopology(topology); - data.setProcessId(streamsRebalanceData.processId().toString()); - streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { - data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() - .setHost(userEndpoint.host()) - .setPort(userEndpoint.port()) - ); - }); - data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() - .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() - .setKey(entry.getKey()) - .setValue(entry.getValue()) - ) - .collect(Collectors.toList())); + + boolean joining = membershipManager.state() == MemberState.JOINING; + + if (joining) { + StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); + topology.setSubtopologies(fromStreamsToHeartbeatRequest(streamsRebalanceData.subtopologies())); + topology.setEpoch(streamsRebalanceData.topologyEpoch()); + data.setTopology(topology); + data.setRebalanceTimeoutMs(rebalanceTimeoutMs); + data.setProcessId(streamsRebalanceData.processId().toString()); + streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { + data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() + .setHost(userEndpoint.host()) + .setPort(userEndpoint.port()) + ); + }); + data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() + .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + ) + .collect(Collectors.toList())); + data.setActiveTasks(fromStreamsToHeartbeatRequest(Set.of())); + data.setStandbyTasks(fromStreamsToHeartbeatRequest(Set.of())); + data.setWarmupTasks(fromStreamsToHeartbeatRequest(Set.of())); + } else { + StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment(); + if (!reconciledAssignment.equals(lastSentFields.assignment)) { + data.setActiveTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.activeTasks())); + data.setStandbyTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.standbyTasks())); + data.setWarmupTasks(fromStreamsToHeartbeatRequest(reconciledAssignment.warmupTasks())); + lastSentFields.assignment = reconciledAssignment; + } + } data.setShutdownApplication(streamsRebalanceData.shutdownRequested()); - StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment(); - data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks())); - data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks())); - data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks())); return data; } - private static List convertTaskIdCollection(final Set tasks) { + private static List fromStreamsToHeartbeatRequest(final Set tasks) { return tasks.stream() .collect( Collectors.groupingBy(StreamsRebalanceData.TaskId::subtopologyId, @@ -117,27 +144,26 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { .entrySet() .stream() .map(entry -> { - StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds(); - ids.setSubtopologyId(entry.getKey()); - ids.setPartitions(entry.getValue()); - return ids; + return new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(entry.getKey()) + .setPartitions(entry.getValue()); }) .collect(Collectors.toList()); } - private static List getTopologyFromStreams(final Map subtopologies) { + private static List fromStreamsToHeartbeatRequest(final Map subtopologies) { final List subtopologiesForRequest = new ArrayList<>(subtopologies.size()); for (final Map.Entry subtopology : subtopologies.entrySet()) { - subtopologiesForRequest.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue())); + subtopologiesForRequest.add(fromStreamsToHeartbeatRequest(subtopology.getKey(), subtopology.getValue())); } subtopologiesForRequest.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.Subtopology::subtopologyId)); return subtopologiesForRequest; } - private static StreamsGroupHeartbeatRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, - final StreamsRebalanceData.Subtopology subtopology) { + private static StreamsGroupHeartbeatRequestData.Subtopology fromStreamsToHeartbeatRequest(final String subtopologyId, + final StreamsRebalanceData.Subtopology subtopology) { final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData = new StreamsGroupHeartbeatRequestData.Subtopology(); - subtopologyData.setSubtopologyId(subtopologyName); + subtopologyData.setSubtopologyId(subtopologyId); ArrayList sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics()); Collections.sort(sortedSourceTopics); subtopologyData.setSourceTopics(sortedSourceTopics); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 47b27d52e29..8acf48bbc6b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -38,25 +38,30 @@ import org.apache.kafka.common.utils.Timer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockedConstruction; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -116,8 +121,21 @@ class StreamsGroupHeartbeatRequestManagerTest { CHANGELOG_TOPICS, COPARTITION_GROUP ); + private static final String SUBTOPOLOGY_NAME_2 = "subtopology2"; + private static final String SOURCE_TOPIC_3 = "sourceTopic3"; + private static final String CHANGELOG_TOPIC_4 = "changelogTopic4"; + private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_2 = new StreamsRebalanceData.Subtopology( + Set.of(SOURCE_TOPIC_3), + Set.of(), + Map.of(), + Map.of(CHANGELOG_TOPIC_4, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of())), + Collections.emptyList() + ); private static final Map SUBTOPOLOGIES = - Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1); + Map.of( + SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1, + SUBTOPOLOGY_NAME_2, SUBTOPOLOGY_2 + ); private static final String CLIENT_TAG_1 = "client-tag1"; private static final String VALUE_1 = "value1"; private static final Map CLIENT_TAGS = Map.of(CLIENT_TAG_1, VALUE_1); @@ -475,101 +493,6 @@ class StreamsGroupHeartbeatRequestManagerTest { } } - @Test - public void testSendingFullHeartbeatRequest() { - try ( - final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( - HeartbeatRequestState.class, - (mock, context) -> { - when(mock.canSendRequest(time.milliseconds())).thenReturn(true); - }) - ) { - final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); - final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); - when(membershipManager.groupId()).thenReturn(GROUP_ID); - when(membershipManager.memberId()).thenReturn(MEMBER_ID); - when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); - when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); - - final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - - assertEquals(0, result.timeUntilNextPollMs); - assertEquals(1, result.unsentRequests.size()); - assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); - NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0); - StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); - assertEquals(GROUP_ID, streamsRequest.data().groupId()); - assertEquals(MEMBER_ID, streamsRequest.data().memberId()); - assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); - assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); - assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId()); - assertEquals(ENDPOINT.host(), streamsRequest.data().userEndpoint().host()); - assertEquals(ENDPOINT.port(), streamsRequest.data().userEndpoint().port()); - assertEquals(1, streamsRequest.data().clientTags().size()); - assertEquals(CLIENT_TAG_1, streamsRequest.data().clientTags().get(0).key()); - assertEquals(VALUE_1, streamsRequest.data().clientTags().get(0).value()); - assertEquals(streamsRebalanceData.topologyEpoch(), streamsRequest.data().topology().epoch()); - assertNotNull(streamsRequest.data().topology()); - final List subtopologies = streamsRequest.data().topology().subtopologies(); - assertEquals(1, subtopologies.size()); - final StreamsGroupHeartbeatRequestData.Subtopology subtopology = subtopologies.get(0); - assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId()); - assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), subtopology.sourceTopics()); - assertEquals(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2", "repartitionSinkTopic3"), subtopology.repartitionSinkTopics()); - assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology.repartitionSourceTopics().size()); - subtopology.repartitionSourceTopics().forEach(topicInfo -> { - final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); - assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); - assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); - assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); - }); - assertEquals(CHANGELOG_TOPICS.size(), subtopology.stateChangelogTopics().size()); - subtopology.stateChangelogTopics().forEach(topicInfo -> { - assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); - assertEquals(0, topicInfo.partitions()); - final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); - assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); - assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); - }); - assertEquals(2, subtopology.copartitionGroups().size()); - final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = - new StreamsGroupHeartbeatRequestData.CopartitionGroup() - .setRepartitionSourceTopics(Collections.singletonList((short) 0)) - .setSourceTopics(Collections.singletonList((short) 1)); - final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = - new StreamsGroupHeartbeatRequestData.CopartitionGroup() - .setRepartitionSourceTopics(Collections.singletonList((short) 1)) - .setSourceTopics(Collections.singletonList((short) 0)); - assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1)); - assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2)); - verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); - verify(membershipManager).onHeartbeatRequestGenerated(); - time.sleep(2000); - assertEquals( - 2.0, - metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() - ); - final ClientResponse response = buildClientResponse(); - networkRequest.future().complete(response); - verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); - verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); - verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); - verify(heartbeatRequestState).resetTimer(); - final List topicPartitions = streamsRebalanceData.partitionsByHost() - .get(new StreamsRebalanceData.HostInfo( - ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), - ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) - ); - assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); - assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); - assertEquals( - 1.0, - metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() - ); - } - } - @Test public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() { try ( @@ -582,7 +505,7 @@ class StreamsGroupHeartbeatRequestManagerTest { Timer.class, (mock, context) -> { when(mock.isExpired()).thenReturn(true); - }); + }) ) { final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); @@ -613,6 +536,445 @@ class StreamsGroupHeartbeatRequestManagerTest { } } + @Test + public void testSendingHeartbeatRequest() { + try ( + final MockedConstruction heartbeatRequestStateMockedConstruction = mockConstruction( + HeartbeatRequestState.class, + (mock, context) -> { + when(mock.canSendRequest(time.milliseconds())).thenReturn(true); + }) + ) { + final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); + final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + + final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.timeUntilNextPollMs); + assertEquals(1, result.unsentRequests.size()); + assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); + NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0); + StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); + assertEquals(GROUP_ID, streamsRequest.data().groupId()); + assertEquals(MEMBER_ID, streamsRequest.data().memberId()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); + verify(heartbeatRequestState).onSendAttempt(time.milliseconds()); + verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); + final ClientResponse response = buildClientResponse(); + networkRequest.future().complete(response); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS); + verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs()); + verify(heartbeatRequestState).resetTimer(); + final List topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final boolean instanceIdPresent) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty()); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1000 + ); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData1.groupId()); + assertEquals(MEMBER_ID, requestData1.memberId()); + assertEquals(MEMBER_EPOCH, requestData1.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData1.instanceId()); + } else { + assertNull(requestData1.instanceId()); + } + + StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData(); + + assertEquals(GROUP_ID, requestData2.groupId()); + assertEquals(MEMBER_ID, requestData2.memberId()); + assertEquals(MEMBER_EPOCH, requestData2.memberEpoch()); + if (instanceIdPresent) { + assertEquals(INSTANCE_ID, requestData2.instanceId()); + } else { + assertNull(requestData2.instanceId()); + } + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1000 + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(streamsRebalanceData.topologyEpoch(), requestData1.topology().epoch()); + final List subtopologies = requestData1.topology().subtopologies(); + assertEquals(2, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), subtopology1.sourceTopics()); + assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), subtopology1.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology1.repartitionSourceTopics().size()); + subtopology1.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size()); + subtopology1.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology1.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2)); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = subtopologies.get(1); + assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId()); + assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSinkTopics()); + assertEquals(Collections.emptyList(), subtopology2.repartitionSourceTopics()); + assertEquals(1, subtopology2.stateChangelogTopics().size()); + assertEquals(CHANGELOG_TOPIC_4, subtopology2.stateChangelogTopics().get(0).name()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).partitions()); + assertEquals(1, subtopology2.stateChangelogTopics().get(0).replicationFactor()); + assertEquals(0, subtopology2.stateChangelogTopics().get(0).topicConfigs().size()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + assertNull(nonJoiningRequestData.topology()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) { + final int rebalanceTimeoutMs = 1234; + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + rebalanceTimeoutMs + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatProcessIdSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData(); + + assertEquals(PROCESS_ID.toString(), requestData1.processId()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.processId()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(ENDPOINT.host(), joiningRequestData.userEndpoint().host()); + assertEquals(ENDPOINT.port(), joiningRequestData.userEndpoint().port()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.userEndpoint()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatClientTagsSentWhenJoining(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(CLIENT_TAG_1, joiningRequestData.clientTags().get(0).key()); + assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value()); + + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestData.clientTags()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testBuildingHeartbeatAssignmentSentWhenChanged(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + + StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData(); + + assertEquals(List.of(), joiningRequestData.activeTasks()); + assertEquals(List.of(), joiningRequestData.standbyTasks()); + assertEquals(List.of(), joiningRequestData.warmupTasks()); + + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + + StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData(); + + assertTaskIdsEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0, 1)), + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_2) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.activeTasks() + ); + assertTaskIdsEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + firstNonJoiningRequestData.standbyTasks() + ); + assertTaskIdsEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(3, 4, 5)) + ), + firstNonJoiningRequestData.warmupTasks() + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData(); + + assertNull(nonJoiningRequestDataWithoutChanges.activeTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks()); + assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks()); + + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + ) + ) + ); + + StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData(); + + assertTaskIdsEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(0)) + ), + nonJoiningRequestDataWithChanges.activeTasks() + ); + assertTaskIdsEquals( + List.of( + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(SUBTOPOLOGY_NAME_1) + .setPartitions(List.of(2)) + ), + nonJoiningRequestDataWithChanges.standbyTasks() + ); + assertEquals(List.of(), nonJoiningRequestDataWithChanges.warmupTasks()); + } + + @ParameterizedTest + @MethodSource("provideNonJoiningStates") + public void testResettingHeartbeatState(final MemberState memberState) { + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(memberState); + streamsRebalanceData.setReconciledAssignment( + new StreamsRebalanceData.Assignment( + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) + ), + Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) + ) + ) + ); + StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataBeforeReset.groupId()); + assertEquals(MEMBER_ID, requestDataBeforeReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId()); + assertFalse(requestDataBeforeReset.activeTasks().isEmpty()); + assertFalse(requestDataBeforeReset.standbyTasks().isEmpty()); + assertFalse(requestDataBeforeReset.warmupTasks().isEmpty()); + + heartbeatState.reset(); + + StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData(); + assertEquals(GROUP_ID, requestDataAfterReset.groupId()); + assertEquals(MEMBER_ID, requestDataAfterReset.memberId()); + assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch()); + assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId()); + assertEquals(requestDataBeforeReset.activeTasks(), requestDataAfterReset.activeTasks()); + assertEquals(requestDataBeforeReset.standbyTasks(), requestDataAfterReset.standbyTasks()); + assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks()); + } + + private static Stream provideNonJoiningStates() { + return Stream.of( + Arguments.of(MemberState.ACKNOWLEDGING), + Arguments.of(MemberState.RECONCILING), + Arguments.of(MemberState.STABLE), + Arguments.of(MemberState.PREPARE_LEAVING), + Arguments.of(MemberState.LEAVING) + ); + } + + @ParameterizedTest + @EnumSource( + value = MemberState.class, + names = {"JOINING", "ACKNOWLEDGING", "RECONCILING", "STABLE", "PREPARE_LEAVING", "LEAVING"} + ) + public void testBuildingHeartbeatShutdownRequested(final MemberState memberState) { + final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = + new StreamsGroupHeartbeatRequestManager.HeartbeatState( + streamsRebalanceData, + membershipManager, + 1234 + ); + when(membershipManager.state()).thenReturn(memberState); + + StreamsGroupHeartbeatRequestData requestDataWithoutShutdownRequest = heartbeatState.buildRequestData(); + + assertFalse(requestDataWithoutShutdownRequest.shutdownApplication()); + + streamsRebalanceData.requestShutdown(); + + StreamsGroupHeartbeatRequestData requestDataWithShutdownRequest = heartbeatState.buildRequestData(); + + assertTrue(requestDataWithShutdownRequest.shutdownApplication()); + } + private static ConsumerConfig config() { Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -651,4 +1013,21 @@ class StreamsGroupHeartbeatRequestManagerTest { ) ); } + + private static void assertTaskIdsEquals(final List expected, + final List actual) { + List sortedExpected = expected.stream() + .map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(taskIds.subtopologyId()) + .setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList()))) + .sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId)) + .collect(Collectors.toList()); + List sortedActual = actual.stream() + .map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(taskIds.subtopologyId()) + .setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList()))) + .sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId)) + .collect(Collectors.toList()); + assertEquals(sortedExpected, sortedActual); + } } \ No newline at end of file