KAFKA-19256: Only send IQ metadata on assignment changes (#19691)
CI / build (push) Waiting to run Details

This PR adds changes, so the IQ endpoint information is only sent to
streams group members when there has been a change in the assignments
requiring an update in the streams client host-partition ownership.

The existing IQ integration test passes with no modifications and
updated the `GroupMetadataManagerTest` to cover the new process.

Reviewers: Matthias Sax <mjsax@apache.org>, Lucas Brutschy
 <lucasbru@apache.org>
This commit is contained in:
Bill Bejeck 2025-05-16 16:54:12 -04:00 committed by GitHub
parent ced56a320b
commit f397cbc14c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 190 additions and 32 deletions

View File

@ -88,6 +88,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
private final int rebalanceTimeoutMs; private final int rebalanceTimeoutMs;
private final StreamsRebalanceData streamsRebalanceData; private final StreamsRebalanceData streamsRebalanceData;
private final LastSentFields lastSentFields = new LastSentFields(); private final LastSentFields lastSentFields = new LastSentFields();
private int endpointInformationEpoch = -1;
public HeartbeatState(final StreamsRebalanceData streamsRebalanceData, public HeartbeatState(final StreamsRebalanceData streamsRebalanceData,
final StreamsMembershipManager membershipManager, final StreamsMembershipManager membershipManager,
@ -101,11 +103,20 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
lastSentFields.reset(); lastSentFields.reset();
} }
public int endpointInformationEpoch() {
return endpointInformationEpoch;
}
public void setEndpointInformationEpoch(int endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
public StreamsGroupHeartbeatRequestData buildRequestData() { public StreamsGroupHeartbeatRequestData buildRequestData() {
StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData(); StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData();
data.setGroupId(membershipManager.groupId()); data.setGroupId(membershipManager.groupId());
data.setMemberId(membershipManager.memberId()); data.setMemberId(membershipManager.memberId());
data.setMemberEpoch(membershipManager.memberEpoch()); data.setMemberEpoch(membershipManager.memberEpoch());
data.setEndpointInformationEpoch(endpointInformationEpoch);
membershipManager.groupInstanceId().ifPresent(data::setInstanceId); membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
boolean joining = membershipManager.state() == MemberState.JOINING; boolean joining = membershipManager.state() == MemberState.JOINING;
@ -515,6 +526,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
final StreamsGroupHeartbeatResponseData data = response.data(); final StreamsGroupHeartbeatResponseData data = response.data();
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs()); heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
if (data.partitionsByUserEndpoint() != null) { if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data)); streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));

View File

@ -31,6 +31,8 @@
"about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." }, "about": "The member ID generated by the streams consumer. The member ID must be kept during the entire lifetime of the streams consumer process." },
{ "name": "MemberEpoch", "type": "int32", "versions": "0+", { "name": "MemberEpoch", "type": "int32", "versions": "0+",
"about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." }, "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
{ "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",
"about": "The current endpoint epoch of this client, represents the latest endpoint epoch this client received"},
{ "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." }, "about": "null if not provided or if it didn't change since the last heartbeat; the instance ID for static membership otherwise." },
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

View File

@ -65,6 +65,8 @@
"about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." }, "about": "Assigned warm-up tasks for this client. Null if unchanged since last heartbeat." },
// IQ-related information // IQ-related information
{ "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",
"about": "The endpoint epoch set in the response"},
{ "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null", { "name": "PartitionsByUserEndpoint", "type": "[]EndpointToPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." , "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." ,
"fields": [ "fields": [

View File

@ -1823,6 +1823,7 @@ public class GroupMetadataManager {
* @param userEndpoint User-defined endpoint for Interactive Queries, or null. * @param userEndpoint User-defined endpoint for Interactive Queries, or null.
* @param clientTags Used for rack-aware assignment algorithm, or null. * @param clientTags Used for rack-aware assignment algorithm, or null.
* @param shutdownApplication Whether all Streams clients in the group should shut down. * @param shutdownApplication Whether all Streams clients in the group should shut down.
* @param memberEndpointEpoch The last endpoint information epoch seen be the group member.
* @return A result containing the StreamsGroupHeartbeat response and a list of records to update the state machine. * @return A result containing the StreamsGroupHeartbeat response and a list of records to update the state machine.
*/ */
private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat( private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
@ -1841,7 +1842,8 @@ public class GroupMetadataManager {
String processId, String processId,
Endpoint userEndpoint, Endpoint userEndpoint,
List<KeyValue> clientTags, List<KeyValue> clientTags,
boolean shutdownApplication boolean shutdownApplication,
int memberEndpointEpoch
) throws ApiException { ) throws ApiException {
final long currentTimeMs = time.milliseconds(); final long currentTimeMs = time.milliseconds();
final List<CoordinatorRecord> records = new ArrayList<>(); final List<CoordinatorRecord> records = new ArrayList<>();
@ -1982,9 +1984,7 @@ public class GroupMetadataManager {
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId()) .setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch()) .setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
// The assignment is only provided in the following cases: // The assignment is only provided in the following cases:
// 1. The member is joining. // 1. The member is joining.
// 2. The member's assignment has been updated. // 2. The member's assignment has been updated.
@ -1992,7 +1992,15 @@ public class GroupMetadataManager {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks())); response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
} }
}
if (group.endpointInformationEpoch() != memberEndpointEpoch) {
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember));
}
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap(); Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
if (updatedConfiguredTopology.topicConfigurationException().isPresent()) { if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
@ -2094,13 +2102,14 @@ public class GroupMetadataManager {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group) { private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group,
StreamsGroupMember updatedMember) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>(); List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
final Map<String, StreamsGroupMember> members = group.members(); final Map<String, StreamsGroupMember> members = group.members();
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) { for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
final String memberIdForAssignment = entry.getKey(); final String memberIdForAssignment = entry.getKey();
final Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint(); final Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint();
StreamsGroupMember groupMember = entry.getValue(); StreamsGroupMember groupMember = updatedMember != null && memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : members.get(memberIdForAssignment);
if (endpointOptional.isPresent()) { if (endpointOptional.isPresent()) {
final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get(); final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get();
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
@ -4640,7 +4649,8 @@ public class GroupMetadataManager {
request.processId(), request.processId(),
request.userEndpoint(), request.userEndpoint(),
request.clientTags(), request.clientTags(),
request.shutdownApplication() request.shutdownApplication(),
request.endpointInformationEpoch()
); );
} }
} }

View File

@ -205,6 +205,12 @@ public class StreamsGroup implements Group {
*/ */
private Optional<String> shutdownRequestMemberId = Optional.empty(); private Optional<String> shutdownRequestMemberId = Optional.empty();
/**
* The current epoch for endpoint information, this is used to determine when to send
* updated endpoint information to members of the group.
*/
private int endpointInformationEpoch = -1;
public StreamsGroup( public StreamsGroup(
LogContext logContext, LogContext logContext,
SnapshotRegistry snapshotRegistry, SnapshotRegistry snapshotRegistry,
@ -1075,4 +1081,12 @@ public class StreamsGroup implements Group {
shutdownRequestMemberId = Optional.empty(); shutdownRequestMemberId = Optional.empty();
} }
} }
public int endpointInformationEpoch() {
return endpointInformationEpoch;
}
public void setEndpointInformationEpoch(int endpointInformationEpoch) {
this.endpointInformationEpoch = endpointInformationEpoch;
}
} }

View File

@ -15678,7 +15678,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()) .setWarmupTasks(List.of())
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code()) .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Source topics bar are missing."))), .setStatusDetail("Source topics bar are missing.")))
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -15764,7 +15765,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()) .setWarmupTasks(List.of())
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Internal topics are missing: [bar]"))), .setStatusDetail("Internal topics are missing: [bar]")))
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -15845,7 +15847,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()) .setWarmupTasks(List.of())
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code()) .setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
.setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))), .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]")))
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -15940,7 +15943,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()) .setWarmupTasks(List.of())
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.STALE_TOPOLOGY.code()) .setStatusCode(Status.STALE_TOPOLOGY.code())
.setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))), .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1.")))
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -16032,7 +16036,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status() new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail) .setStatusDetail(statusDetail)
)), ))
.setEndpointInformationEpoch(-1),
result1.response().data() result1.response().data()
); );
assertRecordsEquals(List.of(), result1.records()); assertRecordsEquals(List.of(), result1.records());
@ -16053,7 +16058,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status() new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail) .setStatusDetail(statusDetail)
)), ))
.setEndpointInformationEpoch(-1),
result2.response().data() result2.response().data()
); );
@ -16146,7 +16152,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status() new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail) .setStatusDetail(statusDetail)
)), ))
.setEndpointInformationEpoch(-1),
result2.response().data() result2.response().data()
); );
} }
@ -16442,7 +16449,9 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000) .setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of()) .setActiveTasks(List.of())
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -16581,7 +16590,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId) .setMemberId(memberId)
.setMemberEpoch(1) .setMemberEpoch(1)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
} }
@ -16685,7 +16695,8 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000) .setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of()) .setActiveTasks(List.of())
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -16726,7 +16737,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0)) .setPartitions(List.of(0))
)) ))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(0),
result.response().data() result.response().data()
); );
@ -16771,7 +16783,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(2)) .setPartitions(List.of(2))
)) ))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(1),
result.response().data() result.response().data()
); );
@ -16804,7 +16817,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId3) .setMemberId(memberId3)
.setMemberEpoch(11) .setMemberEpoch(11)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(1),
result.response().data() result.response().data()
); );
@ -16843,7 +16857,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1) .setMemberId(memberId1)
.setMemberEpoch(11) .setMemberEpoch(11)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(1),
result.response().data() result.response().data()
); );
@ -16871,7 +16886,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2) .setMemberId(memberId2)
.setMemberEpoch(10) .setMemberEpoch(10)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(1),
result.response().data() result.response().data()
); );
@ -16897,7 +16913,8 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology2) .setSubtopologyId(subtopology2)
.setPartitions(List.of(1)))) .setPartitions(List.of(1))))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(2),
result.response().data() result.response().data()
); );
@ -16927,7 +16944,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId3) .setMemberId(memberId3)
.setMemberEpoch(11) .setMemberEpoch(11)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(2),
result.response().data() result.response().data()
); );
@ -16969,7 +16987,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(2)) .setPartitions(List.of(2))
)) ))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(3),
result.response().data() result.response().data()
); );
@ -17013,7 +17032,8 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology2) .setSubtopologyId(subtopology2)
.setPartitions(List.of(1)))) .setPartitions(List.of(1))))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(4),
result.response().data() result.response().data()
); );
@ -17321,7 +17341,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2, 3, 4, 5)) .setPartitions(List.of(0, 1, 2, 3, 4, 5))
)) ))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(1),
result.response().data() result.response().data()
); );
@ -17579,7 +17600,8 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000) .setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of()) .setActiveTasks(List.of())
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -17607,7 +17629,8 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology1) .setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1)))) .setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(0),
result.response().data() result.response().data()
); );
@ -17637,7 +17660,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData() new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1) .setMemberId(memberId1)
.setMemberEpoch(2) .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000), .setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0),
result.response().data() result.response().data()
); );
@ -17734,7 +17758,8 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000) .setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of()) .setActiveTasks(List.of())
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(-1),
result.response().data() result.response().data()
); );
@ -17761,7 +17786,8 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology1) .setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1)))) .setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of()) .setStandbyTasks(List.of())
.setWarmupTasks(List.of()), .setWarmupTasks(List.of())
.setEndpointInformationEpoch(0),
result.response().data() result.response().data()
); );
@ -17887,6 +17913,98 @@ public class GroupMetadataManagerTest {
assertEquals(image, context.groupMetadataManager.image()); assertEquals(image, context.groupMetadataManager.image());
} }
@Test
public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
Uuid fooTopicId = Uuid.randomUuid();
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.build())
.build();
// Prepare new assignment for the group.
assignor.prepareGroupAssignment(
Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result;
// A full response should be sent back on joining.
result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(1500)
.setTopology(topology)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
.setEndpointInformationEpoch(0));
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setPartitionsByUserEndpoint(null),
result.response().data()
);
assertNull(result.response().data().partitionsByUserEndpoint());
context.groupMetadataManager.streamsGroup(groupId).setEndpointInformationEpoch(1);
StreamsGroupHeartbeatResponseData.EndpointToPartitions expectedEndpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
.setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092))
.setActivePartitions(List.of(new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0, 1))))
.setStandbyPartitions(List.of());
result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
.setMemberEpoch(result.response().data().memberEpoch()));
assertNotNull(result.response().data().partitionsByUserEndpoint());
StreamsGroupHeartbeatResponseData.EndpointToPartitions actualEndpointToPartitions = result.response().data().partitionsByUserEndpoint().get(0);
assertEquals(expectedEndpointToPartitions.userEndpoint(), actualEndpointToPartitions.userEndpoint());
StreamsGroupHeartbeatResponseData.TopicPartition expectedEndpointTopicPartitions = expectedEndpointToPartitions.activePartitions().get(0);
StreamsGroupHeartbeatResponseData.TopicPartition actualEndpointTopicPartitions = actualEndpointToPartitions.activePartitions().get(0);
assertEquals(expectedEndpointTopicPartitions.topic(), actualEndpointTopicPartitions.topic());
List<Integer> actualPartitions = actualEndpointTopicPartitions.partitions();
Collections.sort(actualPartitions);
assertEquals(expectedEndpointTopicPartitions.partitions(), actualPartitions);
result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
.setMemberEpoch(result.response().data().memberEpoch())
.setEndpointInformationEpoch(result.response().data().endpointInformationEpoch()));
assertNull(result.response().data().partitionsByUserEndpoint());
}
@Test @Test
public void testConsumerGroupDynamicConfigs() { public void testConsumerGroupDynamicConfigs() {
String groupId = "fooup"; String groupId = "fooup";