diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index 8f7d08e23a5..53e9ade43d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -88,6 +88,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { private final int rebalanceTimeoutMs; private final StreamsRebalanceData streamsRebalanceData; private final LastSentFields lastSentFields = new LastSentFields(); + private int endpointInformationEpoch = -1; + public HeartbeatState(final StreamsRebalanceData streamsRebalanceData, final StreamsMembershipManager membershipManager, @@ -101,11 +103,20 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { lastSentFields.reset(); } + public int endpointInformationEpoch() { + return endpointInformationEpoch; + } + + public void setEndpointInformationEpoch(int endpointInformationEpoch) { + this.endpointInformationEpoch = endpointInformationEpoch; + } + public StreamsGroupHeartbeatRequestData buildRequestData() { StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData(); data.setGroupId(membershipManager.groupId()); data.setMemberId(membershipManager.memberId()); data.setMemberEpoch(membershipManager.memberEpoch()); + data.setEndpointInformationEpoch(endpointInformationEpoch); membershipManager.groupInstanceId().ifPresent(data::setInstanceId); boolean joining = membershipManager.state() == MemberState.JOINING; @@ -515,6 +526,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { final StreamsGroupHeartbeatResponseData data = response.data(); heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs()); heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); + heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch()); if (data.partitionsByUserEndpoint() != null) { streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data)); diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json index 6af7fad4d2b..247c3a68f54 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -31,6 +31,8 @@ "about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." }, + { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+", + "about": "The current endpoint epoch of this client, represents the latest endpoint epoch this client received"}, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 7127fcd1282..e7f7c0b7681 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -65,6 +65,8 @@ "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, // IQ-related information + { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+", + "about": "The endpoint epoch set in the response"}, { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." , "fields": [ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index afeb5f463c3..a2e682722d7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1823,6 +1823,7 @@ public class GroupMetadataManager { * @param userEndpoint User-defined endpoint for Interactive Queries, or null. * @param clientTags Used for rack-aware assignment algorithm, or null. * @param shutdownApplication Whether all Streams clients in the group should shut down. + * @param memberEndpointEpoch The last endpoint information epoch seen be the group member. * @return A result containing the StreamsGroupHeartbeat response and a list of records to update the state machine. */ private CoordinatorResult streamsGroupHeartbeat( @@ -1841,7 +1842,8 @@ public class GroupMetadataManager { String processId, Endpoint userEndpoint, List clientTags, - boolean shutdownApplication + boolean shutdownApplication, + int memberEndpointEpoch ) throws ApiException { final long currentTimeMs = time.milliseconds(); final List records = new ArrayList<>(); @@ -1982,9 +1984,7 @@ public class GroupMetadataManager { StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) - .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); - + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); // The assignment is only provided in the following cases: // 1. The member is joining. // 2. The member's assignment has been updated. @@ -1992,8 +1992,16 @@ public class GroupMetadataManager { response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); + if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) { + group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); + } } + if (group.endpointInformationEpoch() != memberEndpointEpoch) { + response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember)); + } + response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + Map internalTopicsToBeCreated = Collections.emptyMap(); if (updatedConfiguredTopology.topicConfigurationException().isPresent()) { TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get(); @@ -2094,13 +2102,14 @@ public class GroupMetadataManager { .collect(Collectors.toList()); } - private List maybeBuildEndpointToPartitions(StreamsGroup group) { + private List maybeBuildEndpointToPartitions(StreamsGroup group, + StreamsGroupMember updatedMember) { List endpointToPartitionsList = new ArrayList<>(); final Map members = group.members(); for (Map.Entry entry : members.entrySet()) { final String memberIdForAssignment = entry.getKey(); final Optional endpointOptional = members.get(memberIdForAssignment).userEndpoint(); - StreamsGroupMember groupMember = entry.getValue(); + StreamsGroupMember groupMember = updatedMember != null && memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : members.get(memberIdForAssignment); if (endpointOptional.isPresent()) { final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get(); final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); @@ -4640,7 +4649,8 @@ public class GroupMetadataManager { request.processId(), request.userEndpoint(), request.clientTags(), - request.shutdownApplication() + request.shutdownApplication(), + request.endpointInformationEpoch() ); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 549eefe618a..3a38e1d0a1d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -205,6 +205,12 @@ public class StreamsGroup implements Group { */ private Optional shutdownRequestMemberId = Optional.empty(); + /** + * The current epoch for endpoint information, this is used to determine when to send + * updated endpoint information to members of the group. + */ + private int endpointInformationEpoch = -1; + public StreamsGroup( LogContext logContext, SnapshotRegistry snapshotRegistry, @@ -1075,4 +1081,12 @@ public class StreamsGroup implements Group { shutdownRequestMemberId = Optional.empty(); } } + + public int endpointInformationEpoch() { + return endpointInformationEpoch; + } + + public void setEndpointInformationEpoch(int endpointInformationEpoch) { + this.endpointInformationEpoch = endpointInformationEpoch; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 055eca7994b..a511c2529ea 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -15678,7 +15678,8 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.MISSING_SOURCE_TOPICS.code()) - .setStatusDetail("Source topics bar are missing."))), + .setStatusDetail("Source topics bar are missing."))) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -15764,7 +15765,8 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Internal topics are missing: [bar]"))), + .setStatusDetail("Internal topics are missing: [bar]"))) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -15845,7 +15847,8 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code()) - .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))), + .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -15940,7 +15943,8 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.STALE_TOPOLOGY.code()) - .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))), + .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -16032,7 +16036,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )), + )) + .setEndpointInformationEpoch(-1), result1.response().data() ); assertRecordsEquals(List.of(), result1.records()); @@ -16053,7 +16058,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )), + )) + .setEndpointInformationEpoch(-1), result2.response().data() ); @@ -16146,7 +16152,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )), + )) + .setEndpointInformationEpoch(-1), result2.response().data() ); } @@ -16442,7 +16449,9 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(-1), + result.response().data() ); @@ -16581,7 +16590,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(1) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(-1), result.response().data() ); } @@ -16685,7 +16695,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -16726,7 +16737,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(0), result.response().data() ); @@ -16771,7 +16783,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(1), result.response().data() ); @@ -16804,7 +16817,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(1), result.response().data() ); @@ -16843,7 +16857,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(1), result.response().data() ); @@ -16871,7 +16886,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) .setMemberEpoch(10) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(1), result.response().data() ); @@ -16897,7 +16913,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(2), result.response().data() ); @@ -16927,7 +16944,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(2), result.response().data() ); @@ -16969,7 +16987,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(3), result.response().data() ); @@ -17013,7 +17032,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(4), result.response().data() ); @@ -17321,7 +17341,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2, 3, 4, 5)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(1), result.response().data() ); @@ -17579,7 +17600,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -17607,7 +17629,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(0), result.response().data() ); @@ -17637,7 +17660,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(2) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(0), result.response().data() ); @@ -17734,7 +17758,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(-1), result.response().data() ); @@ -17761,7 +17786,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setEndpointInformationEpoch(0), result.response().data() ); @@ -17887,6 +17913,98 @@ public class GroupMetadataManagerTest { assertEquals(image, context.groupMetadataManager.image()); } + @Test + public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .build()) + .build(); + + // Prepare new assignment for the group. + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + CoordinatorResult result; + + // A full response should be sent back on joining. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) + .setEndpointInformationEpoch(0)); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setPartitionsByUserEndpoint(null), + result.response().data() + ); + + assertNull(result.response().data().partitionsByUserEndpoint()); + + context.groupMetadataManager.streamsGroup(groupId).setEndpointInformationEpoch(1); + + StreamsGroupHeartbeatResponseData.EndpointToPartitions expectedEndpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions() + .setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)) + .setActivePartitions(List.of(new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0, 1)))) + .setStandbyPartitions(List.of()); + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) + .setMemberEpoch(result.response().data().memberEpoch())); + + assertNotNull(result.response().data().partitionsByUserEndpoint()); + StreamsGroupHeartbeatResponseData.EndpointToPartitions actualEndpointToPartitions = result.response().data().partitionsByUserEndpoint().get(0); + assertEquals(expectedEndpointToPartitions.userEndpoint(), actualEndpointToPartitions.userEndpoint()); + StreamsGroupHeartbeatResponseData.TopicPartition expectedEndpointTopicPartitions = expectedEndpointToPartitions.activePartitions().get(0); + StreamsGroupHeartbeatResponseData.TopicPartition actualEndpointTopicPartitions = actualEndpointToPartitions.activePartitions().get(0); + + assertEquals(expectedEndpointTopicPartitions.topic(), actualEndpointTopicPartitions.topic()); + List actualPartitions = actualEndpointTopicPartitions.partitions(); + Collections.sort(actualPartitions); + assertEquals(expectedEndpointTopicPartitions.partitions(), actualPartitions); + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) + .setMemberEpoch(result.response().data().memberEpoch()) + .setEndpointInformationEpoch(result.response().data().endpointInformationEpoch())); + + assertNull(result.response().data().partitionsByUserEndpoint()); + } + @Test public void testConsumerGroupDynamicConfigs() { String groupId = "fooup";