mirror of https://github.com/apache/kafka.git
KAFKA-18613: Improve test coverage for missing topics (#19189)
Tests for joining with missing source topics, internal topics, incorrectly partitioned topics, and stale topologies. Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
b375bb099b
commit
6f4d4255a6
|
@ -2273,25 +2273,27 @@ public class GroupMetadataManager {
|
|||
final Topology topology,
|
||||
final StreamsGroup group,
|
||||
final List<CoordinatorRecord> records) {
|
||||
StreamsTopology updatedTopology;
|
||||
if (topology != null) {
|
||||
StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);
|
||||
|
||||
updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
|
||||
|
||||
StreamsTopology streamsTopologyFromRequest = StreamsTopology.fromHeartbeatRequest(topology);
|
||||
if (group.topology().isEmpty()) {
|
||||
log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", groupId, memberId, topology.epoch());
|
||||
|
||||
StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);
|
||||
records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
|
||||
} else if (!updatedTopology.equals(group.topology().get())) {
|
||||
return streamsTopologyFromRequest;
|
||||
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
|
||||
log.info("[GroupId {}][MemberId {}] Member joined with stale topology epoch {}", groupId, memberId, topology.epoch());
|
||||
return group.topology().get();
|
||||
} else if (!group.topology().get().equals(streamsTopologyFromRequest)) {
|
||||
throw new InvalidRequestException("Topology updates are not supported yet.");
|
||||
} else {
|
||||
log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch());
|
||||
return group.topology().get();
|
||||
}
|
||||
} else if (group.topology().isPresent()) {
|
||||
updatedTopology = group.topology().get();
|
||||
return group.topology().get();
|
||||
} else {
|
||||
throw new IllegalStateException("The topology is null and the group topology is also null.");
|
||||
}
|
||||
return updatedTopology;
|
||||
}
|
||||
|
||||
private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) {
|
||||
|
|
|
@ -156,7 +156,6 @@ public class InternalTopicManager {
|
|||
enforceCopartitioning(
|
||||
topology,
|
||||
copartitionGroupsBySubtopology,
|
||||
log,
|
||||
decidedPartitionCountsForInternalTopics,
|
||||
copartitionedTopicsEnforcer
|
||||
);
|
||||
|
@ -168,7 +167,6 @@ public class InternalTopicManager {
|
|||
|
||||
private static void enforceCopartitioning(final StreamsTopology topology,
|
||||
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
|
||||
final Logger log,
|
||||
final Map<String, Integer> decidedPartitionCountsForInternalTopics,
|
||||
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) {
|
||||
final Set<String> fixedRepartitionTopics =
|
||||
|
@ -180,17 +178,13 @@ public class InternalTopicManager {
|
|||
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
|
||||
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
|
||||
|
||||
if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
|
||||
log.info("Skipping the repartition topic validation since there are no repartition topics.");
|
||||
} else {
|
||||
// ensure the co-partitioning topics within the group have the same number of partitions,
|
||||
// and enforce the number of partitions for those repartition topics to be the same if they
|
||||
// are co-partitioned as well.
|
||||
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
|
||||
for (Set<String> copartitionGroup : copartitionGroups) {
|
||||
decidedPartitionCountsForInternalTopics.putAll(
|
||||
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
|
||||
}
|
||||
// ensure the co-partitioning topics within the group have the same number of partitions,
|
||||
// and enforce the number of partitions for those repartition topics to be the same if they
|
||||
// are co-partitioned as well.
|
||||
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
|
||||
for (Set<String> copartitionGroup : copartitionGroups) {
|
||||
decidedPartitionCountsForInternalTopics.putAll(
|
||||
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
|||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
|
||||
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
|
||||
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
|
||||
import org.apache.kafka.common.message.DescribeGroupsResponseData;
|
||||
import org.apache.kafka.common.message.HeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.HeartbeatResponseData;
|
||||
|
@ -59,7 +60,9 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
|
|||
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.CopartitionGroup;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Subtopology;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TopicInfo;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.message.SyncGroupRequestData;
|
||||
|
@ -70,6 +73,7 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
|||
import org.apache.kafka.common.metadata.TopicRecord;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
|
||||
|
@ -15866,11 +15870,349 @@ public class GroupMetadataManagerTest {
|
|||
assertRecordsEquals(expectedRecords, result.records());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String subtopology2 = "subtopology2";
|
||||
String barTopicName = "bar";
|
||||
Topology topology = new Topology().setSubtopologies(List.of(
|
||||
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
|
||||
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
|
||||
));
|
||||
|
||||
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroupTaskAssignors(List.of(assignor))
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
// Member joins the streams group.
|
||||
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
|
||||
new StreamsGroupHeartbeatRequestData()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.setTopology(topology)
|
||||
.setProcessId(DEFAULT_PROCESS_ID)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of()));
|
||||
|
||||
assertEquals(
|
||||
Map.of(),
|
||||
result.response().creatableTopics()
|
||||
);
|
||||
assertResponseEquals(
|
||||
new StreamsGroupHeartbeatResponseData()
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setHeartbeatIntervalMs(5000)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of())
|
||||
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
|
||||
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
|
||||
.setStatusDetail("Source topics bar are missing."))),
|
||||
result.response().data()
|
||||
);
|
||||
|
||||
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setMemberEpoch(1)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.build();
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = List.of(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
|
||||
Map.of(
|
||||
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
|
||||
)
|
||||
),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
||||
);
|
||||
|
||||
assertRecordsEquals(expectedRecords, result.records());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
Topology topology = new Topology().setSubtopologies(List.of(
|
||||
new Subtopology()
|
||||
.setSubtopologyId(subtopology1)
|
||||
.setSourceTopics(List.of(fooTopicName))
|
||||
.setStateChangelogTopics(List.of(new TopicInfo().setName(barTopicName)))
|
||||
)
|
||||
);
|
||||
|
||||
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroupTaskAssignors(List.of(assignor))
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
// Member joins the streams group.
|
||||
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
|
||||
new StreamsGroupHeartbeatRequestData()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.setTopology(topology)
|
||||
.setProcessId(DEFAULT_PROCESS_ID)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of()));
|
||||
|
||||
assertEquals(
|
||||
Map.of(barTopicName,
|
||||
new CreatableTopic()
|
||||
.setName(barTopicName)
|
||||
.setNumPartitions(6)
|
||||
.setReplicationFactor((short) -1)
|
||||
),
|
||||
result.response().creatableTopics()
|
||||
);
|
||||
assertResponseEquals(
|
||||
new StreamsGroupHeartbeatResponseData()
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setHeartbeatIntervalMs(5000)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of())
|
||||
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
|
||||
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
|
||||
.setStatusDetail("Internal topics are missing: [bar]"))),
|
||||
result.response().data()
|
||||
);
|
||||
|
||||
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setMemberEpoch(1)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.build();
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = List.of(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
|
||||
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
|
||||
)),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
||||
);
|
||||
|
||||
assertRecordsEquals(expectedRecords, result.records());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
Uuid barTopicId = Uuid.randomUuid();
|
||||
Topology topology = new Topology().setSubtopologies(List.of(
|
||||
new Subtopology()
|
||||
.setSubtopologyId(subtopology1)
|
||||
.setSourceTopics(List.of(fooTopicName, barTopicName))
|
||||
.setCopartitionGroups(List.of(new CopartitionGroup().setSourceTopics(List.of((short) 0, (short) 1))))
|
||||
)
|
||||
);
|
||||
|
||||
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroupTaskAssignors(List.of(assignor))
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.addTopic(barTopicId, barTopicName, 3)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
// Member joins the streams group.
|
||||
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
|
||||
new StreamsGroupHeartbeatRequestData()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.setTopology(topology)
|
||||
.setProcessId(DEFAULT_PROCESS_ID)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of()));
|
||||
|
||||
assertEquals(
|
||||
Map.of(),
|
||||
result.response().creatableTopics()
|
||||
);
|
||||
assertResponseEquals(
|
||||
new StreamsGroupHeartbeatResponseData()
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setHeartbeatIntervalMs(5000)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of())
|
||||
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
|
||||
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
|
||||
.setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))),
|
||||
result.response().data()
|
||||
);
|
||||
|
||||
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setMemberEpoch(1)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.build();
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = List.of(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
|
||||
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
||||
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
|
||||
)),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
||||
);
|
||||
|
||||
assertRecordsEquals(expectedRecords, result.records());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamsGroupMemberJoiningWithStaleTopology() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
Uuid barTopicId = Uuid.randomUuid();
|
||||
Topology topology0 = new Topology().setEpoch(0).setSubtopologies(List.of(
|
||||
new Subtopology()
|
||||
.setSubtopologyId(subtopology1)
|
||||
.setSourceTopics(List.of(fooTopicName))
|
||||
)
|
||||
);
|
||||
Topology topology1 = new Topology().setEpoch(1).setSubtopologies(List.of(
|
||||
new Subtopology()
|
||||
.setSubtopologyId(subtopology1)
|
||||
.setSourceTopics(List.of(fooTopicName, barTopicName))
|
||||
)
|
||||
);
|
||||
|
||||
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroupTaskAssignors(List.of(assignor))
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.addTopic(barTopicId, barTopicName, 3)
|
||||
.build())
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder(groupId, 10)
|
||||
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
|
||||
)
|
||||
.build();
|
||||
|
||||
assignor.prepareGroupAssignment(new org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment(Map.of(
|
||||
memberId, org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment.empty()
|
||||
)));
|
||||
|
||||
// Member joins the streams group.
|
||||
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
|
||||
new StreamsGroupHeartbeatRequestData()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.setTopology(topology0)
|
||||
.setProcessId(DEFAULT_PROCESS_ID)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of()));
|
||||
|
||||
assertEquals(
|
||||
Map.of(),
|
||||
result.response().creatableTopics()
|
||||
);
|
||||
assertResponseEquals(
|
||||
new StreamsGroupHeartbeatResponseData()
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(11)
|
||||
.setHeartbeatIntervalMs(5000)
|
||||
.setActiveTasks(List.of())
|
||||
.setStandbyTasks(List.of())
|
||||
.setWarmupTasks(List.of())
|
||||
.setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status()
|
||||
.setStatusCode(Status.STALE_TOPOLOGY.code())
|
||||
.setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))),
|
||||
result.response().data()
|
||||
);
|
||||
|
||||
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setMemberEpoch(11)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setRebalanceTimeoutMs(1500)
|
||||
.build();
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = List.of(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
|
||||
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
||||
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
|
||||
)),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
||||
);
|
||||
|
||||
assertRecordsEquals(expectedRecords, result.records());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16074,7 +16416,6 @@ public class GroupMetadataManagerTest {
|
|||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
String memberId3 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16170,7 +16511,6 @@ public class GroupMetadataManagerTest {
|
|||
String groupId = "fooup";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16241,7 +16581,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16311,7 +16650,6 @@ public class GroupMetadataManagerTest {
|
|||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
String memberId3 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16824,7 +17162,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() {
|
||||
String groupId = "fooup";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16867,7 +17204,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -16964,7 +17300,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -17081,7 +17416,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsSessionTimeoutLifecycle() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -17158,7 +17492,6 @@ public class GroupMetadataManagerTest {
|
|||
public void testStreamsSessionTimeoutExpiration() {
|
||||
String groupId = "fooup";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -17224,7 +17557,6 @@ public class GroupMetadataManagerTest {
|
|||
String groupId = "fooup";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
@ -17381,7 +17713,6 @@ public class GroupMetadataManagerTest {
|
|||
String groupId = "fooup";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
|
||||
String subtopology1 = "subtopology1";
|
||||
String fooTopicName = "foo";
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
|
|
Loading…
Reference in New Issue