mirror of https://github.com/apache/kafka.git
KAFKA-18730: Add replaying streams group state from offset topic (#18809)
Adds streams group to the GroupMetadataManager, and implements loading the records from the offset topic into state. The state also contains two timers (rebalance timeout and session timeout) that are started after the group coordinator has been loaded. Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
2b6e868538
commit
d0c65a1fd2
|
@ -327,9 +327,9 @@
|
|||
|
||||
<!-- group coordinator -->
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="(ConsumerGroupMember|GroupMetadataManager|GeneralUniformAssignmentBuilder|GroupCoordinatorRecordSerde).java"/>
|
||||
files="(ConsumerGroupMember|GroupMetadataManager|GeneralUniformAssignmentBuilder|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext).java"/>
|
||||
<suppress checks="(NPathComplexity|MethodLength)"
|
||||
files="(GroupMetadataManager|ConsumerGroupTest|ShareGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>
|
||||
files="(GroupMetadataManager|ConsumerGroupTest|ShareGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder|GroupCoordinatorShard).java"/>
|
||||
<suppress checks="ClassFanOutComplexity"
|
||||
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
|
||||
<suppress checks="ParameterNumber"
|
||||
|
|
|
@ -91,6 +91,20 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
|
|||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
|
@ -258,7 +272,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
|
||||
|
||||
/**
|
||||
* The classic and consumer group size counter key to schedule a timer task.
|
||||
* The classic, consumer and streams group size counter key to schedule a timer task.
|
||||
*
|
||||
* Visible for testing.
|
||||
*/
|
||||
|
@ -923,6 +937,55 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupMetadataKey) key,
|
||||
(StreamsGroupMetadataValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_PARTITION_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupPartitionMetadataKey) key,
|
||||
(StreamsGroupPartitionMetadataValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_MEMBER_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupMemberMetadataKey) key,
|
||||
(StreamsGroupMemberMetadataValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TARGET_ASSIGNMENT_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTargetAssignmentMetadataKey) key,
|
||||
(StreamsGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTargetAssignmentMemberKey) key,
|
||||
(StreamsGroupTargetAssignmentMemberValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_CURRENT_MEMBER_ASSIGNMENT:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupCurrentMemberAssignmentKey) key,
|
||||
(StreamsGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TOPOLOGY:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTopologyKey) key,
|
||||
(StreamsGroupTopologyValue) Utils.messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("Received an unknown record type " + recordType
|
||||
+ " in " + record);
|
||||
|
|
|
@ -104,6 +104,20 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
|
|||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
|
@ -118,6 +132,10 @@ import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
|
|||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
|
||||
import org.apache.kafka.coordinator.group.streams.TasksTuple;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.TopicImage;
|
||||
|
@ -160,6 +178,7 @@ import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_I
|
|||
import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
|
||||
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
|
||||
import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
|
||||
import static org.apache.kafka.coordinator.group.Group.GroupType.STREAMS;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord;
|
||||
|
@ -188,6 +207,11 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
|
|||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
||||
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
|
||||
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
|
||||
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
|
||||
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
|
||||
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
|
||||
|
||||
|
||||
/**
|
||||
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
|
||||
|
@ -384,6 +408,11 @@ public class GroupMetadataManager {
|
|||
*/
|
||||
private final GroupConfigManager groupConfigManager;
|
||||
|
||||
/**
|
||||
* The session timeout for streams groups.
|
||||
*/
|
||||
private final int streamsGroupSessionTimeoutMs;
|
||||
|
||||
/**
|
||||
* The metadata image.
|
||||
*/
|
||||
|
@ -439,6 +468,7 @@ public class GroupMetadataManager {
|
|||
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.groupConfigManager = groupConfigManager;
|
||||
this.shareGroupAssignor = shareGroupAssignor;
|
||||
this.streamsGroupSessionTimeoutMs = 45000;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -687,6 +717,75 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or maybe creates a streams group without updating the groups map.
|
||||
* The group will be materialized during the replay.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param createIfNotExists A boolean indicating whether the group should be
|
||||
* created if it does not exist or is an empty classic group.
|
||||
*
|
||||
* @return A StreamsGroup.
|
||||
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
|
||||
* if the group is not a streams group.
|
||||
*
|
||||
* Package private for testing.
|
||||
*/
|
||||
StreamsGroup getOrMaybeCreateStreamsGroup(
|
||||
String groupId,
|
||||
boolean createIfNotExists
|
||||
) throws GroupIdNotFoundException {
|
||||
Group group = groups.get(groupId);
|
||||
|
||||
if (group == null && !createIfNotExists) {
|
||||
throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId));
|
||||
}
|
||||
|
||||
if (group == null) {
|
||||
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
|
||||
} else {
|
||||
if (group.type() == STREAMS) {
|
||||
return (StreamsGroup) group;
|
||||
} else {
|
||||
throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.", groupId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a streams group by committed offset.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param committedOffset A specified committed offset corresponding to this shard.
|
||||
*
|
||||
* @return A StreamsGroup.
|
||||
* @throws GroupIdNotFoundException if the group does not exist or is not a streams group.
|
||||
*/
|
||||
private StreamsGroup streamsGroup(
|
||||
String groupId,
|
||||
long committedOffset
|
||||
) throws GroupIdNotFoundException {
|
||||
Group group = group(groupId, committedOffset);
|
||||
|
||||
if (group.type() == STREAMS) {
|
||||
return (StreamsGroup) group;
|
||||
} else {
|
||||
// We don't support upgrading/downgrading between protocols at the moment so
|
||||
// we throw an exception if a group exists with the wrong type.
|
||||
throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.",
|
||||
groupId));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An overloaded method of {@link GroupMetadataManager#streamsGroup(String, long)}
|
||||
*/
|
||||
StreamsGroup streamsGroup(
|
||||
String groupId
|
||||
) throws GroupIdNotFoundException {
|
||||
return streamsGroup(groupId, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a consumer group by committed offset.
|
||||
*
|
||||
|
@ -760,6 +859,43 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The method should be called on the replay path.
|
||||
* Gets or maybe creates a streams group and updates the groups map if a new group is created.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param createIfNotExists A boolean indicating whether the group should be
|
||||
* created if it does not exist.
|
||||
*
|
||||
* @return A StreamsGroup.
|
||||
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
|
||||
* if the group is not a streams group.
|
||||
* @throws IllegalStateException if the group does not have the expected type.
|
||||
* Package private for testing.
|
||||
*/
|
||||
private StreamsGroup getOrMaybeCreatePersistedStreamsGroup(
|
||||
String groupId,
|
||||
boolean createIfNotExists
|
||||
) throws GroupIdNotFoundException, IllegalStateException {
|
||||
Group group = groups.get(groupId);
|
||||
|
||||
if (group == null && !createIfNotExists) {
|
||||
throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId));
|
||||
}
|
||||
|
||||
if (group == null) {
|
||||
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
|
||||
groups.put(groupId, streamsGroup);
|
||||
return streamsGroup;
|
||||
} else if (group.type() == STREAMS) {
|
||||
return (StreamsGroup) group;
|
||||
} else {
|
||||
// We don't support upgrading/downgrading between protocols at the moment, so
|
||||
// we throw an exception if a group exists with the wrong type.
|
||||
throw new IllegalStateException(String.format("Group %s is not a streams group.", groupId));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or maybe creates a classic group.
|
||||
*
|
||||
|
@ -3138,6 +3274,47 @@ public class GroupMetadataManager {
|
|||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* Fences a member from a streams group.
|
||||
*
|
||||
* @param group The group.
|
||||
* @param member The member.
|
||||
* @param response The response of the CoordinatorResult.
|
||||
*
|
||||
* @return The CoordinatorResult to be applied.
|
||||
*/
|
||||
private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(
|
||||
StreamsGroup group,
|
||||
StreamsGroupMember member,
|
||||
T response
|
||||
) {
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
|
||||
records.addAll(removeStreamsMember(group.groupId(), member.memberId()));
|
||||
|
||||
// We bump the group epoch.
|
||||
int groupEpoch = group.groupEpoch() + 1;
|
||||
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
|
||||
|
||||
cancelTimers(group.groupId(), member.memberId());
|
||||
|
||||
return new CoordinatorResult<>(records, response);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write tombstones for the member.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param memberId The member id.
|
||||
*/
|
||||
private List<CoordinatorRecord> removeStreamsMember(String groupId, String memberId) {
|
||||
return List.of(
|
||||
newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId),
|
||||
newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
||||
newStreamsGroupMemberTombstoneRecord(groupId, memberId)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maybe delete the resolved regular expressions associated with the provided members
|
||||
* if they were the last ones subscribed to them.
|
||||
|
@ -3196,6 +3373,25 @@ public class GroupMetadataManager {
|
|||
cancelConsumerGroupSyncTimeout(groupId, memberId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules (or reschedules) the session timeout for the member.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param memberId The member id.
|
||||
*/
|
||||
private void scheduleStreamsGroupSessionTimeout(
|
||||
String groupId,
|
||||
String memberId
|
||||
) {
|
||||
timer.schedule(
|
||||
groupSessionTimeoutKey(groupId, memberId),
|
||||
streamsGroupSessionTimeoutMs,
|
||||
TimeUnit.MILLISECONDS,
|
||||
true,
|
||||
() -> streamsGroupFenceMemberOperation(groupId, memberId, "the member session expired.")
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules (or reschedules) the session timeout for the member.
|
||||
*
|
||||
|
@ -3288,6 +3484,39 @@ public class GroupMetadataManager {
|
|||
return new CoordinatorResult<>(Collections.emptyList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Fences a member from a streams group.
|
||||
* Returns an empty CoordinatorResult if the group or the member doesn't exist.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param memberId The member id.
|
||||
* @param reason The reason for fencing the member.
|
||||
*
|
||||
* @return The CoordinatorResult to be applied.
|
||||
*/
|
||||
private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMemberOperation(
|
||||
String groupId,
|
||||
String memberId,
|
||||
String reason
|
||||
) {
|
||||
try {
|
||||
StreamsGroup group = streamsGroup(groupId);
|
||||
StreamsGroupMember member = group.getOrMaybeCreateMember(memberId, false);
|
||||
log.info("[GroupId {}] Streams member {} fenced from the group because {}.",
|
||||
groupId, memberId, reason);
|
||||
|
||||
return streamsGroupFenceMember(group, member, null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("[GroupId {}] Could not fence streams member {} because the group does not exist.",
|
||||
groupId, memberId);
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
log.debug("[GroupId {}] Could not fence streams member {} because the member does not exist.",
|
||||
groupId, memberId);
|
||||
}
|
||||
|
||||
return new CoordinatorResult<>(Collections.emptyList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules (or reschedules) the session timeout for the member.
|
||||
*
|
||||
|
@ -3356,7 +3585,7 @@ public class GroupMetadataManager {
|
|||
int memberEpoch,
|
||||
int rebalanceTimeoutMs
|
||||
) {
|
||||
String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
|
||||
String key = groupRebalanceTimeoutKey(groupId, memberId);
|
||||
timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
|
||||
try {
|
||||
ConsumerGroup group = consumerGroup(groupId);
|
||||
|
@ -3385,6 +3614,49 @@ public class GroupMetadataManager {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a rebalance timeout for the member.
|
||||
*
|
||||
* @param groupId The group id.
|
||||
* @param memberId The member id.
|
||||
* @param memberEpoch The member epoch.
|
||||
* @param rebalanceTimeoutMs The rebalance timeout.
|
||||
*/
|
||||
private void scheduleStreamsGroupRebalanceTimeout(
|
||||
String groupId,
|
||||
String memberId,
|
||||
int memberEpoch,
|
||||
int rebalanceTimeoutMs
|
||||
) {
|
||||
String key = groupRebalanceTimeoutKey(groupId, memberId);
|
||||
timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
|
||||
try {
|
||||
StreamsGroup group = streamsGroup(groupId);
|
||||
StreamsGroupMember member = group.getOrMaybeCreateMember(memberId, false);
|
||||
|
||||
if (member.memberEpoch() == memberEpoch) {
|
||||
log.info("[GroupId {}] Member {} fenced from the group because " +
|
||||
"it failed to transition from epoch {} within {}ms.",
|
||||
groupId, memberId, memberEpoch, rebalanceTimeoutMs);
|
||||
|
||||
return streamsGroupFenceMember(group, member, null);
|
||||
} else {
|
||||
log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " +
|
||||
"is not in epoch {} anymore.", groupId, memberId, memberEpoch);
|
||||
return new CoordinatorResult<>(Collections.emptyList());
|
||||
}
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("[GroupId {}] Could not fence {}} because the group does not exist.",
|
||||
groupId, memberId);
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
log.debug("[GroupId {}] Could not fence {} because the member does not exist.",
|
||||
groupId, memberId);
|
||||
}
|
||||
|
||||
return new CoordinatorResult<>(Collections.emptyList());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the rebalance timeout of the member.
|
||||
*
|
||||
|
@ -3395,7 +3667,7 @@ public class GroupMetadataManager {
|
|||
String groupId,
|
||||
String memberId
|
||||
) {
|
||||
timer.cancel(consumerGroupRebalanceTimeoutKey(groupId, memberId));
|
||||
timer.cancel(groupRebalanceTimeoutKey(groupId, memberId));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3509,6 +3781,43 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupTopologyKey/Value to update the hard state of
|
||||
* the streams group.
|
||||
*
|
||||
* @param key A StreamsGroupTopologyKey key.
|
||||
* @param value A StreamsGroupTopologyValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupTopologyKey key,
|
||||
StreamsGroupTopologyValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist and a tombstone is replayed, we can ignore it.
|
||||
return;
|
||||
}
|
||||
|
||||
Set<String> oldSubscribedTopicNames;
|
||||
if (streamsGroup.topology().isPresent()) {
|
||||
oldSubscribedTopicNames = streamsGroup.topology().get().requiredTopics();
|
||||
} else {
|
||||
oldSubscribedTopicNames = Collections.emptySet();
|
||||
}
|
||||
if (value != null) {
|
||||
StreamsTopology topology = StreamsTopology.fromRecord(value);
|
||||
streamsGroup.setTopology(topology);
|
||||
Set<String> newSubscribedTopicNames = topology.requiredTopics();
|
||||
updateGroupsByTopics(groupId, oldSubscribedTopicNames, newSubscribedTopicNames);
|
||||
} else {
|
||||
updateGroupsByTopics(groupId, oldSubscribedTopicNames, Collections.emptySet());
|
||||
streamsGroup.setTopology(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a ShareGroupHeartbeat request.
|
||||
*
|
||||
|
@ -3889,6 +4198,77 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupMetadataKey/Value to update the hard state of
|
||||
* the Streams group. It updates the group epoch of the Streams
|
||||
* group or deletes the Streams group.
|
||||
*
|
||||
* @param key A StreamsGroupMetadataKey key.
|
||||
* @param value A StreamsGroupMetadataValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupMetadataKey key,
|
||||
StreamsGroupMetadataValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
|
||||
if (value != null) {
|
||||
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
|
||||
streamsGroup.setGroupEpoch(value.epoch());
|
||||
} else {
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
|
||||
if (!streamsGroup.members().isEmpty()) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete group " + groupId
|
||||
+ " but the group still has " + streamsGroup.members().size() + " members.");
|
||||
}
|
||||
if (streamsGroup.assignmentEpoch() != -1) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete group " + groupId
|
||||
+ " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone.");
|
||||
}
|
||||
removeGroup(groupId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupPartitionMetadataKey/Value to update the hard state of
|
||||
* the streams group. It updates the subscription metadata of the streams
|
||||
* group.
|
||||
*
|
||||
* @param key A StreamsGroupPartitionMetadataKey key.
|
||||
* @param value A StreamsGroupPartitionMetadataValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupPartitionMetadataKey key,
|
||||
StreamsGroupPartitionMetadataValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = new HashMap<>();
|
||||
value.topics().forEach(topicMetadata -> {
|
||||
partitionMetadata.put(topicMetadata.topicName(), org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata));
|
||||
});
|
||||
streamsGroup.setPartitionMetadata(partitionMetadata);
|
||||
} else {
|
||||
streamsGroup.setPartitionMetadata(Collections.emptyMap());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays ShareGroupMemberMetadataKey/Value to update the hard state of
|
||||
* the share group. It updates the subscription part of the member or
|
||||
|
@ -3962,7 +4342,167 @@ public class GroupMetadataManager {
|
|||
}
|
||||
removeGroup(groupId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupMemberMetadataKey/Value to update the hard state of
|
||||
* the streams group.
|
||||
* It updates the subscription part of the member or deletes the member.
|
||||
*
|
||||
* @param key A StreamsGroupMemberMetadataKey key.
|
||||
* @param value A StreamsGroupMemberMetadataValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupMemberMetadataKey key,
|
||||
StreamsGroupMemberMetadataValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist and a tombstone is replayed, we can ignore it.
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, true);
|
||||
streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember)
|
||||
.updateWith(value)
|
||||
.build());
|
||||
} else {
|
||||
StreamsGroupMember oldMember;
|
||||
try {
|
||||
oldMember = streamsGroup.getOrMaybeCreateMember(memberId, false);
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
// If the member does not exist, we can ignore it.
|
||||
return;
|
||||
}
|
||||
|
||||
if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
|
||||
+ " but did not receive StreamsGroupCurrentMemberAssignmentValue tombstone.");
|
||||
}
|
||||
if (streamsGroup.targetAssignment().containsKey(memberId)) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
|
||||
+ " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone.");
|
||||
}
|
||||
streamsGroup.removeMember(memberId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupTargetAssignmentMetadataKey/Value to update the hard state of
|
||||
* the streams group.
|
||||
* It updates the target assignment epoch or sets it to -1 to signal that it has been deleted.
|
||||
*
|
||||
* @param key A StreamsGroupTargetAssignmentMetadataKey key.
|
||||
* @param value A StreamsGroupTargetAssignmentMetadataValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupTargetAssignmentMetadataKey key,
|
||||
StreamsGroupTargetAssignmentMetadataValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
|
||||
if (value != null) {
|
||||
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
|
||||
streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
|
||||
} else {
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
if (!streamsGroup.targetAssignment().isEmpty()) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId
|
||||
+ " but the assignment still has " + streamsGroup.targetAssignment().size() + " members.");
|
||||
}
|
||||
streamsGroup.setTargetAssignmentEpoch(-1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupTargetAssignmentMemberKey/Value to update the hard state of
|
||||
* the consumer group.
|
||||
* It updates the target assignment of the member or deletes it.
|
||||
*
|
||||
* @param key A StreamsGroupTargetAssignmentMemberKey key.
|
||||
* @param value A StreamsGroupTargetAssignmentMemberValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupTargetAssignmentMemberKey key,
|
||||
StreamsGroupTargetAssignmentMemberValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
|
||||
if (value != null) {
|
||||
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
|
||||
streamsGroup.updateTargetAssignment(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.fromTargetAssignmentRecord(value));
|
||||
} else {
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
streamsGroup.removeTargetAssignment(memberId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replays StreamsGroupCurrentMemberAssignmentKey/Value to update the hard state of
|
||||
* the consumer group.
|
||||
* It updates the assignment of a member or deletes it.
|
||||
*
|
||||
* @param key A StreamsGroupCurrentMemberAssignmentKey key.
|
||||
* @param value A StreamsGroupCurrentMemberAssignmentValue record.
|
||||
*/
|
||||
public void replay(
|
||||
StreamsGroupCurrentMemberAssignmentKey key,
|
||||
StreamsGroupCurrentMemberAssignmentValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
|
||||
if (value != null) {
|
||||
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
|
||||
StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, true);
|
||||
StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember)
|
||||
.updateWith(value)
|
||||
.build();
|
||||
streamsGroup.updateMember(newMember);
|
||||
} else {
|
||||
StreamsGroup streamsGroup;
|
||||
try {
|
||||
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// If the group does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
|
||||
StreamsGroupMember oldMember;
|
||||
try {
|
||||
oldMember = streamsGroup.getOrMaybeCreateMember(memberId, false);
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
// If the member does not exist, we can ignore the tombstone.
|
||||
return;
|
||||
}
|
||||
|
||||
StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember)
|
||||
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
|
||||
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
|
||||
.setAssignedTasks(TasksTuple.EMPTY)
|
||||
.setTasksPendingRevocation(TasksTuple.EMPTY)
|
||||
.build();
|
||||
streamsGroup.updateMember(newMember);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4113,6 +4653,9 @@ public class GroupMetadataManager {
|
|||
if (group != null && (group.type() == CONSUMER || group.type() == SHARE)) {
|
||||
((ModernGroup<?>) group).requestMetadataRefresh();
|
||||
}
|
||||
if (group != null && (group.type() == STREAMS)) {
|
||||
((StreamsGroup) group).requestMetadataRefresh();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -4122,6 +4665,7 @@ public class GroupMetadataManager {
|
|||
public void updateGroupSizeCounter() {
|
||||
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
|
||||
Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<>();
|
||||
Map<StreamsGroup.StreamsGroupState, Long> streamsGroupSizeCounter = new HashMap<>();
|
||||
groups.forEach((__, group) -> {
|
||||
switch (group.type()) {
|
||||
case CLASSIC:
|
||||
|
@ -4130,12 +4674,16 @@ public class GroupMetadataManager {
|
|||
case CONSUMER:
|
||||
consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue);
|
||||
break;
|
||||
case STREAMS:
|
||||
streamsGroupSizeCounter.compute(((StreamsGroup) group).state(), Utils::incValue);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
metrics.setClassicGroupGauges(classicGroupSizeCounter);
|
||||
metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
|
||||
metrics.setStreamsGroupGauges(streamsGroupSizeCounter);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4146,6 +4694,23 @@ public class GroupMetadataManager {
|
|||
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
|
||||
groups.forEach((groupId, group) -> {
|
||||
switch (group.type()) {
|
||||
case STREAMS:
|
||||
StreamsGroup streamsGroup = (StreamsGroup) group;
|
||||
log.info("Loaded streams group {} with {} members.", groupId, streamsGroup.members().size());
|
||||
streamsGroup.members().forEach((memberId, member) -> {
|
||||
log.debug("Loaded member {} in streams group {}.", memberId, groupId);
|
||||
scheduleStreamsGroupSessionTimeout(groupId, memberId);
|
||||
if (member.state() == org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) {
|
||||
scheduleStreamsGroupRebalanceTimeout(
|
||||
groupId,
|
||||
member.memberId(),
|
||||
member.memberEpoch(),
|
||||
member.rebalanceTimeoutMs()
|
||||
);
|
||||
}
|
||||
});
|
||||
break;
|
||||
|
||||
case CONSUMER:
|
||||
ConsumerGroup consumerGroup = (ConsumerGroup) group;
|
||||
log.info("Loaded consumer group {} with {} members.", groupId, consumerGroup.members().size());
|
||||
|
@ -4200,6 +4765,11 @@ public class GroupMetadataManager {
|
|||
public void onUnloaded() {
|
||||
groups.values().forEach(group -> {
|
||||
switch (group.type()) {
|
||||
case STREAMS:
|
||||
StreamsGroup streamsGroup = (StreamsGroup) group;
|
||||
log.info("[GroupId={}] Unloaded group metadata for group epoch {}.",
|
||||
streamsGroup.groupId(), streamsGroup.groupEpoch());
|
||||
break;
|
||||
case CONSUMER:
|
||||
ConsumerGroup consumerGroup = (ConsumerGroup) group;
|
||||
log.info("[GroupId={}] Unloaded group metadata for group epoch {}.",
|
||||
|
@ -4248,7 +4818,7 @@ public class GroupMetadataManager {
|
|||
return "session-timeout-" + groupId + "-" + memberId;
|
||||
}
|
||||
|
||||
public static String consumerGroupRebalanceTimeoutKey(String groupId, String memberId) {
|
||||
public static String groupRebalanceTimeoutKey(String groupId, String memberId) {
|
||||
return "rebalance-timeout-" + groupId + "-" + memberId;
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
/**
|
||||
* Streams group size gauge counters keyed by the metric name.
|
||||
*/
|
||||
private final Map<StreamsGroupState, TimelineGaugeCounter> streamsGroupGauges;
|
||||
private volatile Map<StreamsGroupState, Long> streamsGroupGauges;
|
||||
|
||||
/**
|
||||
* All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards.
|
||||
|
@ -115,6 +115,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
|
||||
this.classicGroupGauges = Collections.emptyMap();
|
||||
this.consumerGroupGauges = Collections.emptyMap();
|
||||
this.streamsGroupGauges = Collections.emptyMap();
|
||||
|
||||
this.shareGroupGauges = Utils.mkMap(
|
||||
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
|
||||
|
@ -125,21 +126,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
|
||||
);
|
||||
|
||||
this.streamsGroupGauges = Utils.mkMap(
|
||||
Utils.mkEntry(StreamsGroupState.EMPTY,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
|
||||
Utils.mkEntry(StreamsGroupState.ASSIGNING,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
|
||||
Utils.mkEntry(StreamsGroupState.RECONCILING,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
|
||||
Utils.mkEntry(StreamsGroupState.STABLE,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
|
||||
Utils.mkEntry(StreamsGroupState.DEAD,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
|
||||
Utils.mkEntry(StreamsGroupState.NOT_READY,
|
||||
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
|
||||
);
|
||||
|
||||
this.globalSensors = Objects.requireNonNull(globalSensors);
|
||||
this.topicPartition = Objects.requireNonNull(topicPartition);
|
||||
}
|
||||
|
@ -164,19 +150,17 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
public void setConsumerGroupGauges(Map<ConsumerGroupState, Long> consumerGroupGauges) {
|
||||
this.consumerGroupGauges = consumerGroupGauges;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Increment the number of streams groups.
|
||||
* Set the number of streams groups.
|
||||
* This method should be the only way to update the map and is called by the scheduled task
|
||||
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
|
||||
* Breaking this will result in inconsistent behavior.
|
||||
*
|
||||
* @param state the streams group state.
|
||||
* @param streamsGroupGauges The map counting the number of streams groups in each state.
|
||||
*/
|
||||
public void incrementNumStreamsGroups(StreamsGroupState state) {
|
||||
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
|
||||
if (gaugeCounter != null) {
|
||||
synchronized (gaugeCounter.timelineLong) {
|
||||
gaugeCounter.timelineLong.increment();
|
||||
}
|
||||
}
|
||||
public void setStreamsGroupGauges(Map<StreamsGroupState, Long> streamsGroupGauges) {
|
||||
this.streamsGroupGauges = streamsGroupGauges;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -188,20 +172,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the number of streams groups.
|
||||
*
|
||||
* @param state the streams group state.
|
||||
*/
|
||||
public void decrementNumStreamsGroups(StreamsGroupState state) {
|
||||
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
|
||||
if (gaugeCounter != null) {
|
||||
synchronized (gaugeCounter.timelineLong) {
|
||||
gaugeCounter.timelineLong.decrement();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The number of offsets.
|
||||
*/
|
||||
|
@ -256,16 +226,16 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
}
|
||||
|
||||
/**
|
||||
* Obtain the number of streams groups in the specified state.
|
||||
* Get the number of streams groups in the specified state.
|
||||
*
|
||||
* @param state the streams group state.
|
||||
*
|
||||
* @return The number of streams groups in `state`.
|
||||
*/
|
||||
public long numStreamsGroups(StreamsGroupState state) {
|
||||
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
|
||||
if (gaugeCounter != null) {
|
||||
return gaugeCounter.atomicLong.get();
|
||||
Long counter = streamsGroupGauges.get(state);
|
||||
if (counter != null) {
|
||||
return counter;
|
||||
}
|
||||
return 0L;
|
||||
}
|
||||
|
@ -275,7 +245,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
*/
|
||||
public long numStreamsGroups() {
|
||||
return streamsGroupGauges.values().stream()
|
||||
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
|
||||
.mapToLong(Long::longValue).sum();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,14 +288,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
}
|
||||
gaugeCounter.atomicLong.set(value);
|
||||
});
|
||||
|
||||
this.streamsGroupGauges.forEach((__, gaugeCounter) -> {
|
||||
long value;
|
||||
synchronized (gaugeCounter.timelineLong) {
|
||||
value = gaugeCounter.timelineLong.get(offset);
|
||||
}
|
||||
gaugeCounter.atomicLong.set(value);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -410,66 +372,4 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a streams group's state has changed. Increment/decrement
|
||||
* the counter accordingly.
|
||||
*
|
||||
* @param oldState The previous state. null value means that it's a new group.
|
||||
* @param newState The next state. null value means that the group has been removed.
|
||||
*/
|
||||
public void onStreamsGroupStateTransition(
|
||||
StreamsGroupState oldState,
|
||||
StreamsGroupState newState
|
||||
) {
|
||||
if (newState != null) {
|
||||
switch (newState) {
|
||||
case EMPTY:
|
||||
incrementNumStreamsGroups(StreamsGroupState.EMPTY);
|
||||
break;
|
||||
case NOT_READY:
|
||||
incrementNumStreamsGroups(StreamsGroupState.NOT_READY);
|
||||
break;
|
||||
case ASSIGNING:
|
||||
incrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
|
||||
break;
|
||||
case RECONCILING:
|
||||
incrementNumStreamsGroups(StreamsGroupState.RECONCILING);
|
||||
break;
|
||||
case STABLE:
|
||||
incrementNumStreamsGroups(StreamsGroupState.STABLE);
|
||||
break;
|
||||
case DEAD:
|
||||
incrementNumStreamsGroups(StreamsGroupState.DEAD);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown new state for streams group: " + newState);
|
||||
}
|
||||
}
|
||||
|
||||
if (oldState != null) {
|
||||
switch (oldState) {
|
||||
case EMPTY:
|
||||
decrementNumStreamsGroups(StreamsGroupState.EMPTY);
|
||||
break;
|
||||
case NOT_READY:
|
||||
decrementNumStreamsGroups(StreamsGroupState.NOT_READY);
|
||||
break;
|
||||
case ASSIGNING:
|
||||
decrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
|
||||
break;
|
||||
case RECONCILING:
|
||||
decrementNumStreamsGroups(StreamsGroupState.RECONCILING);
|
||||
break;
|
||||
case STABLE:
|
||||
decrementNumStreamsGroups(StreamsGroupState.STABLE);
|
||||
break;
|
||||
case DEAD:
|
||||
decrementNumStreamsGroups(StreamsGroupState.DEAD);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown old state for streams group: " + newState);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -265,7 +265,7 @@ public class StreamsGroup implements Group {
|
|||
}
|
||||
|
||||
public void setTopology(StreamsTopology topology) {
|
||||
this.topology.set(Optional.of(topology));
|
||||
this.topology.set(Optional.ofNullable(topology));
|
||||
maybeUpdateConfiguredTopology();
|
||||
maybeUpdateGroupState();
|
||||
}
|
||||
|
@ -340,7 +340,7 @@ public class StreamsGroup implements Group {
|
|||
public StreamsGroupMember getOrMaybeCreateMember(
|
||||
String memberId,
|
||||
boolean createIfNotExists
|
||||
) {
|
||||
) throws UnknownMemberIdException {
|
||||
StreamsGroupMember member = members.get(memberId);
|
||||
if (member != null) {
|
||||
return member;
|
||||
|
@ -465,6 +465,15 @@ public class StreamsGroup implements Group {
|
|||
targetAssignment.put(memberId, newTargetAssignment);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the target assignment of a member.
|
||||
*
|
||||
* @param memberId The member id.
|
||||
*/
|
||||
public void removeTargetAssignment(String memberId) {
|
||||
targetAssignment.remove(memberId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return An immutable map containing all the target assignment keyed by member ID.
|
||||
*/
|
||||
|
@ -779,11 +788,10 @@ public class StreamsGroup implements Group {
|
|||
* Updates the current state of the group.
|
||||
*/
|
||||
private void maybeUpdateGroupState() {
|
||||
StreamsGroupState previousState = state.get();
|
||||
StreamsGroupState newState = STABLE;
|
||||
if (members.isEmpty()) {
|
||||
newState = EMPTY;
|
||||
} else if (topology() == null || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
|
||||
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
|
||||
newState = NOT_READY;
|
||||
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
|
||||
newState = ASSIGNING;
|
||||
|
@ -797,7 +805,6 @@ public class StreamsGroup implements Group {
|
|||
}
|
||||
|
||||
state.set(newState);
|
||||
metrics.onStreamsGroupStateTransition(previousState, newState);
|
||||
}
|
||||
|
||||
private void maybeUpdateConfiguredTopology() {
|
||||
|
@ -932,7 +939,7 @@ public class StreamsGroup implements Group {
|
|||
}
|
||||
|
||||
/**
|
||||
* Adds the partitions epoch based on the provided assignment.
|
||||
* Adds the partition epoch based on the provided assignment.
|
||||
*
|
||||
* @param tasks The assigned tasks.
|
||||
* @param processId The process ID.
|
||||
|
@ -942,7 +949,7 @@ public class StreamsGroup implements Group {
|
|||
TasksTuple tasks,
|
||||
String processId
|
||||
) {
|
||||
if (tasks != null) {
|
||||
if (tasks != null && processId != null) {
|
||||
addTaskProcessId(tasks.activeTasks(), processId, currentActiveTaskToProcessId);
|
||||
addTaskProcessIdToSet(tasks.standbyTasks(), processId, currentStandbyTaskToProcessIds);
|
||||
addTaskProcessIdToSet(tasks.warmupTasks(), processId, currentWarmupTaskToProcessIds);
|
||||
|
|
|
@ -63,6 +63,20 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
|
|||
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
|
||||
|
@ -830,6 +844,384 @@ public class GroupCoordinatorShardTest {
|
|||
verify(groupMetadataManager, times(1)).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMetadata() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
StreamsGroupMetadataKey key = new StreamsGroupMetadataKey();
|
||||
StreamsGroupMetadataValue value = new StreamsGroupMetadataValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMetadataWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupMetadataKey key = new StreamsGroupMetadataKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTopology() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTopologyKey key = new StreamsGroupTopologyKey();
|
||||
StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTopologyWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTopologyKey key = new StreamsGroupTopologyKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupPartitionMetadata() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupPartitionMetadataKey key = new StreamsGroupPartitionMetadataKey();
|
||||
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupPartitionMetadataWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupPartitionMetadataKey key = new StreamsGroupPartitionMetadataKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMemberMetadata() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupMemberMetadataKey key = new StreamsGroupMemberMetadataKey();
|
||||
StreamsGroupMemberMetadataValue value = new StreamsGroupMemberMetadataValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMemberMetadataWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupMemberMetadataKey key = new StreamsGroupMemberMetadataKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMetadata() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTargetAssignmentMetadataKey key = new StreamsGroupTargetAssignmentMetadataKey();
|
||||
StreamsGroupTargetAssignmentMetadataValue value = new StreamsGroupTargetAssignmentMetadataValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMetadataWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTargetAssignmentMetadataKey key = new StreamsGroupTargetAssignmentMetadataKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMember() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTargetAssignmentMemberKey key = new StreamsGroupTargetAssignmentMemberKey();
|
||||
StreamsGroupTargetAssignmentMemberValue value = new StreamsGroupTargetAssignmentMemberValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMemberKeyWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupTargetAssignmentMemberKey key = new StreamsGroupTargetAssignmentMemberKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupCurrentMemberAssignment() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupCurrentMemberAssignmentKey key = new StreamsGroupCurrentMemberAssignmentKey();
|
||||
StreamsGroupCurrentMemberAssignmentValue value = new StreamsGroupCurrentMemberAssignmentValue();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
|
||||
key,
|
||||
new ApiMessageAndVersion(value, (short) 0)
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupCurrentMemberAssignmentWithNullValue() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
|
||||
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
|
||||
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
|
||||
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
|
||||
new LogContext(),
|
||||
groupMetadataManager,
|
||||
offsetMetadataManager,
|
||||
Time.SYSTEM,
|
||||
new MockCoordinatorTimer<>(Time.SYSTEM),
|
||||
mock(GroupCoordinatorConfig.class),
|
||||
coordinatorMetrics,
|
||||
metricsShard
|
||||
);
|
||||
|
||||
StreamsGroupCurrentMemberAssignmentKey key = new StreamsGroupCurrentMemberAssignmentKey();
|
||||
|
||||
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
|
||||
key
|
||||
));
|
||||
|
||||
verify(groupMetadataManager).replay(key, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayKeyCannotBeNull() {
|
||||
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
|
||||
|
|
|
@ -80,6 +80,8 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
|
|||
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
|
@ -91,6 +93,14 @@ import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
|
|||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
|
||||
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil;
|
||||
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
|
||||
import org.apache.kafka.coordinator.group.streams.TasksTuple;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
|
@ -104,6 +114,7 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -136,7 +147,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGro
|
|||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupJoinKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupRebalanceTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupSessionTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ADDRESS;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ID;
|
||||
|
@ -164,6 +175,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class GroupMetadataManagerTest {
|
||||
|
||||
@Test
|
||||
public void testConsumerHeartbeatRequestValidation() {
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||
|
@ -3518,7 +3530,7 @@ public class GroupMetadataManagerTest {
|
|||
// Verify the expired timeout.
|
||||
assertEquals(
|
||||
List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
|
||||
consumerGroupRebalanceTimeoutKey(groupId, memberId1),
|
||||
groupRebalanceTimeoutKey(groupId, memberId1),
|
||||
new CoordinatorResult<>(
|
||||
List.of(
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
||||
|
@ -3585,7 +3597,73 @@ public class GroupMetadataManagerTest {
|
|||
assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", "foo-2")));
|
||||
|
||||
// foo-1 should also have a revocation timeout in place.
|
||||
assertNotNull(context.timer.timeout(consumerGroupRebalanceTimeoutKey("foo", "foo-1")));
|
||||
assertNotNull(context.timer.timeout(groupRebalanceTimeoutKey("foo", "foo-1")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnLoadedWithStreamsGroup() {
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String fooTopicName = "foo";
|
||||
Uuid barTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.addTopic(barTopicId, barTopicName, 3)
|
||||
.build())
|
||||
.withStreamsGroup(new StreamsGroupBuilder("foo", 10)
|
||||
.withMember(new StreamsGroupMember.Builder("foo-1")
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS)
|
||||
.setMemberEpoch(9)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setProcessId("process-id")
|
||||
.setRackId(null)
|
||||
.setInstanceId(null)
|
||||
.setRebalanceTimeoutMs(100)
|
||||
.setClientTags(new HashMap<>())
|
||||
.setAssignedTasks(TasksTuple.EMPTY)
|
||||
.setTasksPendingRevocation(TasksTuple.EMPTY)
|
||||
.setTopologyEpoch(1)
|
||||
.setUserEndpoint(new Endpoint().setHost("localhost").setPort(1500))
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
||||
TaskAssignmentTestUtil.mkTasks(fooTopicName, 0, 1, 2)))
|
||||
.setTasksPendingRevocation(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
||||
TaskAssignmentTestUtil.mkTasks(fooTopicName, 3, 4, 5)))
|
||||
.build())
|
||||
.withMember(new StreamsGroupMember.Builder("foo-2")
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(10)
|
||||
.setProcessId("process-id")
|
||||
.setRackId(null)
|
||||
.setInstanceId(null)
|
||||
.setAssignedTasks(TasksTuple.EMPTY)
|
||||
.setTasksPendingRevocation(TasksTuple.EMPTY)
|
||||
.setRebalanceTimeoutMs(100)
|
||||
.setClientTags(new HashMap<>())
|
||||
.setTopologyEpoch(1)
|
||||
.setUserEndpoint(new Endpoint().setHost("localhost").setPort(1500))
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.build())
|
||||
.withTargetAssignment("foo-1", TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
||||
TaskAssignmentTestUtil.mkTasks(fooTopicName, 3, 4, 5)))
|
||||
.withTargetAssignmentEpoch(10))
|
||||
.build();
|
||||
|
||||
// Let's assume that all the records have been replayed and now
|
||||
// onLoaded is called to signal it.
|
||||
context.groupMetadataManager.onLoaded();
|
||||
|
||||
// All members should have a session timeout in place.
|
||||
assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", "foo-1")));
|
||||
assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", "foo-2")));
|
||||
|
||||
// foo-1 should also have a revocation timeout in place.
|
||||
assertNotNull(context.timer.timeout(groupRebalanceTimeoutKey("foo", "foo-1")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3656,6 +3734,77 @@ public class GroupMetadataManagerTest {
|
|||
)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateStreamsGroupSizeCounter() {
|
||||
List<String> groupIds = new ArrayList<>();
|
||||
IntStream.range(0, 5).forEach(i -> groupIds.add("group-" + i));
|
||||
List<String> streamsMemberIds = List.of("streams-member-id-0", "streams-member-id-1", "streams-member-id-2", "streams-member-id-3");
|
||||
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(0), 10)) // Empty group
|
||||
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(1), 10) // Stable group
|
||||
.withTargetAssignmentEpoch(10)
|
||||
.withTopology(new StreamsTopology(1, Map.of()))
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(0))
|
||||
.setMemberEpoch(10)
|
||||
.build()))
|
||||
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(2), 10) // Assigning group
|
||||
.withTargetAssignmentEpoch(9)
|
||||
.withTopology(new StreamsTopology(1, Map.of()))
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(1))
|
||||
.setMemberEpoch(9)
|
||||
.build()))
|
||||
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(3), 10) // Reconciling group
|
||||
.withTargetAssignmentEpoch(10)
|
||||
.withTopology(new StreamsTopology(1, Map.of()))
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2))
|
||||
.setMemberEpoch(9)
|
||||
.build()))
|
||||
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(4), 10) // NotReady group
|
||||
.withTargetAssignmentEpoch(10)
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(3))
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
context.groupMetadataManager.updateGroupSizeCounter();
|
||||
verify(context.metrics, times(1)).setStreamsGroupGauges(eq(Utils.mkMap(
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.EMPTY, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.ASSIGNING, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.RECONCILING, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.NOT_READY, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.STABLE, 1L)
|
||||
)));
|
||||
|
||||
context.groupMetadataManager.getOrMaybeCreateStreamsGroup(groupIds.get(1), false)
|
||||
.removeMember(streamsMemberIds.get(0));
|
||||
context.groupMetadataManager.getOrMaybeCreateStreamsGroup(groupIds.get(3), false)
|
||||
.updateMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2)).setMemberEpoch(10).build());
|
||||
|
||||
context.groupMetadataManager.updateGroupSizeCounter();
|
||||
verify(context.metrics, times(1)).setStreamsGroupGauges(eq(Utils.mkMap(
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.EMPTY, 2L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.ASSIGNING, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.NOT_READY, 1L),
|
||||
Utils.mkEntry(StreamsGroup.StreamsGroupState.STABLE, 1L)
|
||||
)));
|
||||
}
|
||||
|
||||
private StreamsGroupMember.Builder streamsGroupMemberBuilderWithDefaults(String memberId) {
|
||||
return new StreamsGroupMember.Builder(memberId)
|
||||
.setMemberEpoch(1)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||
.setRackId(null)
|
||||
.setInstanceId(null)
|
||||
.setRebalanceTimeoutMs(1000)
|
||||
.setAssignedTasks(TasksTuple.EMPTY)
|
||||
.setTasksPendingRevocation(TasksTuple.EMPTY)
|
||||
.setTopologyEpoch(1)
|
||||
.setClientTags(Map.of())
|
||||
.setProcessId("process-id")
|
||||
.setUserEndpoint(new Endpoint().setHost("localhost").setPort(1500));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenerateRecordsOnNewClassicGroup() throws Exception {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
|
@ -11443,7 +11592,7 @@ public class GroupMetadataManagerTest {
|
|||
// Advance time past the session timeout.
|
||||
// Member 2 should be fenced from the group, thus triggering the downgrade.
|
||||
ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(30000 + 1).get(0);
|
||||
assertEquals(consumerGroupRebalanceTimeoutKey(groupId, memberId2), timeout.key);
|
||||
assertEquals(groupRebalanceTimeoutKey(groupId, memberId2), timeout.key);
|
||||
|
||||
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
|
@ -15135,6 +15284,422 @@ public class GroupMetadataManagerTest {
|
|||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMemberMetadata() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
StreamsGroupMember member = new StreamsGroupMember.Builder("member")
|
||||
.setClientId("clientid")
|
||||
.setClientHost("clienthost")
|
||||
.setRackId("rackid")
|
||||
.setInstanceId("instanceid")
|
||||
.setRebalanceTimeoutMs(1000)
|
||||
.setTopologyEpoch(10)
|
||||
.setProcessId("processid")
|
||||
.setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
|
||||
.setClientTags(Collections.singletonMap("key", "value"))
|
||||
.build();
|
||||
|
||||
// The group and the member are created if they do not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo", member));
|
||||
assertEquals(member, context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMemberMetadataTombstoneNotExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group still exists but the member is already gone. Replaying the
|
||||
// StreamsGroupMemberMetadata tombstone should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
|
||||
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false));
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupMemberMetadata tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar", "m1"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMemberMetadataTombstoneExisting() {
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder("foo", 10)
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults("m1").build())
|
||||
.withTargetAssignment("m1", tasks)
|
||||
)
|
||||
.build();
|
||||
|
||||
IllegalStateException e = assertThrows(IllegalStateException.class,
|
||||
() -> context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")));
|
||||
assertEquals("Received a tombstone record to delete member m1 but did not receive "
|
||||
+ "StreamsGroupCurrentMemberAssignmentValue tombstone.",
|
||||
e.getMessage());
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
||||
|
||||
IllegalStateException e2 = assertThrows(IllegalStateException.class,
|
||||
() -> context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")));
|
||||
assertEquals("Received a tombstone record to delete member m1 but did not receive "
|
||||
+ "StreamsGroupTargetAssignmentMetadataValue tombstone.",
|
||||
e2.getMessage());
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
|
||||
|
||||
assertFalse(context.groupMetadataManager.streamsGroup("foo").hasMember("m1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupMetadata() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group is created if it does not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
||||
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupEpochTombstoneNotExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupMetadata tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupEpochTombstoneExisting() {
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder("foo", 10)
|
||||
.withTargetAssignmentEpoch(10)
|
||||
.withMember(streamsGroupMemberBuilderWithDefaults("m1").build())
|
||||
.withTargetAssignment("m1", tasks)
|
||||
)
|
||||
.build();
|
||||
|
||||
IllegalStateException e = assertThrows(IllegalStateException.class,
|
||||
() -> context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")));
|
||||
assertEquals("Received a tombstone record to delete group foo but the group still has 1 members.",
|
||||
e.getMessage());
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
|
||||
|
||||
IllegalStateException e2 = assertThrows(IllegalStateException.class,
|
||||
() -> context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo")));
|
||||
assertEquals("Received a tombstone record to delete group foo but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone.",
|
||||
e2.getMessage());
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
|
||||
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupPartitionMetadata() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of(
|
||||
"bar",
|
||||
new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10)
|
||||
);
|
||||
|
||||
// The group is created if it does not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata));
|
||||
assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupPartitionMetadataTombstoneNotExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupPartitionMetadataTombstoneExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(new StreamsGroupBuilder("foo", 10).withPartitionMetadata(
|
||||
Map.of("topic1", new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "topic1", 10))
|
||||
))
|
||||
.build();
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
|
||||
|
||||
assertTrue(context.groupMetadataManager.streamsGroup("foo").partitionMetadata().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMember() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group is created if it does not exist.
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks));
|
||||
assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
|
||||
assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
|
||||
assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMemberTombstoneNonExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMember tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMemberTombstoneExisting() {
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(new StreamsGroupBuilder("foo", 10).withTargetAssignment("m1", tasks))
|
||||
.build();
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));
|
||||
|
||||
assertTrue(context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMetadata() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group is created if it does not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo", 10));
|
||||
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMetadataTombstoneNotExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupTargetAssignmentMetadata tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTargetAssignmentMetadataTombstoneExisting() {
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder("foo", 10)
|
||||
.withTargetAssignmentEpoch(10)
|
||||
.withTargetAssignment("m1", tasks)
|
||||
)
|
||||
.build();
|
||||
|
||||
IllegalStateException e = assertThrows(
|
||||
IllegalStateException.class,
|
||||
() -> context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"))
|
||||
);
|
||||
assertEquals("Received a tombstone record to delete target assignment of foo but the assignment still has 1 members.",
|
||||
e.getMessage());
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
|
||||
|
||||
assertEquals(-1, context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupCurrentMemberAssignment() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
StreamsGroupMember member = new StreamsGroupMember.Builder("member")
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
|
||||
.setAssignedTasks(new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
))
|
||||
.setTasksPendingRevocation(TasksTuple.EMPTY)
|
||||
.build();
|
||||
|
||||
// The group and the member are created if they do not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar", member));
|
||||
assertEquals(member, context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupCurrentMemberAssignmentTombstoneNotExisting() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group still exists, but the member is already gone. Replaying the
|
||||
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
||||
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false));
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupCurrentMemberAssignment tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar", "m1"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupCurrentMemberAssignmentTombstoneExisting() {
|
||||
final TasksTuple tasks =
|
||||
new TasksTuple(
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
|
||||
TaskAssignmentTestUtil.mkTasksPerSubtopology(
|
||||
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
|
||||
);
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder("foo", 10)
|
||||
.withMember(
|
||||
streamsGroupMemberBuilderWithDefaults("m1")
|
||||
.setAssignedTasks(tasks)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
||||
|
||||
final StreamsGroupMember member = context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1", false);
|
||||
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, member.memberEpoch());
|
||||
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, member.previousMemberEpoch());
|
||||
assertTrue(member.assignedTasks().isEmpty());
|
||||
assertTrue(member.tasksPendingRevocation().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTopology() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
|
||||
.setEpoch(12)
|
||||
.setSubtopologies(
|
||||
List.of(
|
||||
new StreamsGroupTopologyValue.Subtopology()
|
||||
.setSubtopologyId("subtopology-1")
|
||||
.setSourceTopics(List.of("source-topic"))
|
||||
.setRepartitionSinkTopics(List.of("sink-topic"))
|
||||
)
|
||||
);
|
||||
|
||||
// The group and the topology are created if they do not exist.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar", topology));
|
||||
final Optional<StreamsTopology> actualTopology = context.groupMetadataManager.streamsGroup("bar").topology();
|
||||
assertTrue(actualTopology.isPresent(), "topology should be set");
|
||||
assertEquals(topology.epoch(), actualTopology.get().topologyEpoch());
|
||||
assertEquals(topology.subtopologies().size(), actualTopology.get().subtopologies().size());
|
||||
assertEquals(
|
||||
topology.subtopologies().iterator().next(),
|
||||
actualTopology.get().subtopologies().values().iterator().next()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTopologyTombstoneNotExists() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.build();
|
||||
|
||||
// The group still exists, but the member is already gone. Replaying the
|
||||
// StreamsGroupTopology tombstone should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
|
||||
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
|
||||
|
||||
// The group may not exist at all. Replaying the StreamsGroupTopology tombstone
|
||||
// should be a no-op.
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("bar"));
|
||||
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayStreamsGroupTopologyTombstoneExists() {
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withStreamsGroup(
|
||||
new StreamsGroupBuilder("foo", 10)
|
||||
.withTopology(new StreamsTopology(10, Map.of()))
|
||||
)
|
||||
.build();
|
||||
|
||||
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
|
||||
|
||||
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerGroupHeartbeatOnShareGroup() {
|
||||
String groupId = "group-foo";
|
||||
|
|
|
@ -87,12 +87,27 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
|
|||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
@ -118,8 +133,8 @@ import static org.apache.kafka.coordinator.group.GroupConfigManagerTest.createCo
|
|||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupRebalanceTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupSessionTimeoutKey;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
|
||||
|
@ -445,6 +460,7 @@ public class GroupMetadataManagerTestContext {
|
|||
private MetadataImage metadataImage;
|
||||
private GroupConfigManager groupConfigManager;
|
||||
private final List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>();
|
||||
private final List<StreamsGroupBuilder> streamsGroupBuilders = new ArrayList<>();
|
||||
private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
|
||||
private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share");
|
||||
private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<>();
|
||||
|
@ -465,6 +481,11 @@ public class GroupMetadataManagerTestContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder withStreamsGroup(StreamsGroupBuilder builder) {
|
||||
this.streamsGroupBuilders.add(builder);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withShareGroup(ShareGroupBuilder builder) {
|
||||
this.shareGroupBuilders.add(builder);
|
||||
return this;
|
||||
|
@ -510,6 +531,7 @@ public class GroupMetadataManagerTestContext {
|
|||
|
||||
consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
|
||||
shareGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
|
||||
streamsGroupBuilders.forEach(builder -> builder.build().forEach(context::replay));
|
||||
|
||||
context.commit();
|
||||
|
||||
|
@ -701,7 +723,7 @@ public class GroupMetadataManagerTestContext {
|
|||
long delayMs
|
||||
) {
|
||||
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
|
||||
timer.timeout(consumerGroupRebalanceTimeoutKey(groupId, memberId));
|
||||
timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
|
||||
assertNotNull(timeout);
|
||||
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
|
||||
return timeout;
|
||||
|
@ -712,7 +734,7 @@ public class GroupMetadataManagerTestContext {
|
|||
String memberId
|
||||
) {
|
||||
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
|
||||
timer.timeout(consumerGroupRebalanceTimeoutKey(groupId, memberId));
|
||||
timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
|
||||
assertNull(timeout);
|
||||
}
|
||||
|
||||
|
@ -1614,6 +1636,55 @@ public class GroupMetadataManagerTestContext {
|
|||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_CURRENT_MEMBER_ASSIGNMENT:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupCurrentMemberAssignmentKey) key,
|
||||
(StreamsGroupCurrentMemberAssignmentValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_MEMBER_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupMemberMetadataKey) key,
|
||||
(StreamsGroupMemberMetadataValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupMetadataKey) key,
|
||||
(StreamsGroupMetadataValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_PARTITION_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupPartitionMetadataKey) key,
|
||||
(StreamsGroupPartitionMetadataValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTargetAssignmentMemberKey) key,
|
||||
(StreamsGroupTargetAssignmentMemberValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TARGET_ASSIGNMENT_METADATA:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTargetAssignmentMetadataKey) key,
|
||||
(StreamsGroupTargetAssignmentMetadataValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
case STREAMS_GROUP_TOPOLOGY:
|
||||
groupMetadataManager.replay(
|
||||
(StreamsGroupTopologyKey) key,
|
||||
(StreamsGroupTopologyValue) messageOrNull(value)
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("Received an unknown record type " + record.key().apiKey()
|
||||
+ " in " + record);
|
||||
|
|
|
@ -220,6 +220,13 @@ public class GroupCoordinatorMetricsTest {
|
|||
ConsumerGroupState.DEAD, 1L
|
||||
));
|
||||
|
||||
shard0.setStreamsGroupGauges(Collections.singletonMap(StreamsGroupState.ASSIGNING, 2L));
|
||||
shard1.setStreamsGroupGauges(Map.of(
|
||||
StreamsGroupState.RECONCILING, 1L,
|
||||
StreamsGroupState.DEAD, 1L,
|
||||
StreamsGroupState.NOT_READY, 1L
|
||||
));
|
||||
|
||||
IntStream.range(0, 6).forEach(__ -> shard0.incrementNumOffsets());
|
||||
IntStream.range(0, 2).forEach(__ -> shard1.incrementNumOffsets());
|
||||
IntStream.range(0, 1).forEach(__ -> shard1.decrementNumOffsets());
|
||||
|
@ -227,10 +234,6 @@ public class GroupCoordinatorMetricsTest {
|
|||
IntStream.range(0, 5).forEach(__ -> shard0.incrementNumShareGroups(ShareGroup.ShareGroupState.STABLE));
|
||||
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY));
|
||||
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD));
|
||||
|
||||
IntStream.range(0, 5).forEach(__ -> shard0.incrementNumStreamsGroups(StreamsGroupState.STABLE));
|
||||
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumStreamsGroups(StreamsGroupState.EMPTY));
|
||||
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumStreamsGroups(StreamsGroupState.DEAD));
|
||||
|
||||
assertEquals(4, shard0.numClassicGroups());
|
||||
assertEquals(5, shard1.numClassicGroups());
|
||||
|
@ -248,6 +251,7 @@ public class GroupCoordinatorMetricsTest {
|
|||
|
||||
assertEquals(5, shard0.numConsumerGroups());
|
||||
assertEquals(2, shard1.numConsumerGroups());
|
||||
|
||||
assertEquals(6, shard0.numOffsets());
|
||||
assertEquals(1, shard1.numOffsets());
|
||||
assertGaugeValue(
|
||||
|
@ -265,12 +269,12 @@ public class GroupCoordinatorMetricsTest {
|
|||
7
|
||||
);
|
||||
|
||||
assertEquals(5, shard0.numStreamsGroups());
|
||||
assertEquals(2, shard1.numStreamsGroups());
|
||||
assertEquals(2, shard0.numStreamsGroups());
|
||||
assertEquals(3, shard1.numStreamsGroups());
|
||||
assertGaugeValue(
|
||||
metrics,
|
||||
metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "streams")),
|
||||
7
|
||||
5
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group.streams;
|
||||
|
||||
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StreamsGroupBuilder {
|
||||
|
||||
private final String groupId;
|
||||
private final int groupEpoch;
|
||||
private int targetAssignmentEpoch;
|
||||
private StreamsTopology topology;
|
||||
private final Map<String, StreamsGroupMember> members = new HashMap<>();
|
||||
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
|
||||
private Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
|
||||
|
||||
public StreamsGroupBuilder(String groupId, int groupEpoch) {
|
||||
this.groupId = groupId;
|
||||
this.groupEpoch = groupEpoch;
|
||||
this.targetAssignmentEpoch = 0;
|
||||
this.topology = null;
|
||||
}
|
||||
|
||||
public StreamsGroupBuilder withMember(StreamsGroupMember member) {
|
||||
this.members.put(member.memberId(), member);
|
||||
return this;
|
||||
}
|
||||
|
||||
public StreamsGroupBuilder withPartitionMetadata(Map<String, TopicMetadata> partitionMetadata) {
|
||||
this.partitionMetadata = partitionMetadata;
|
||||
return this;
|
||||
}
|
||||
|
||||
public StreamsGroupBuilder withTopology(StreamsTopology streamsTopology) {
|
||||
this.topology = streamsTopology;
|
||||
return this;
|
||||
}
|
||||
|
||||
public StreamsGroupBuilder withTargetAssignment(String memberId, TasksTuple targetAssignment) {
|
||||
this.targetAssignments.put(memberId, targetAssignment);
|
||||
return this;
|
||||
}
|
||||
|
||||
public StreamsGroupBuilder withTargetAssignmentEpoch(int targetAssignmentEpoch) {
|
||||
this.targetAssignmentEpoch = targetAssignmentEpoch;
|
||||
return this;
|
||||
}
|
||||
|
||||
public List<CoordinatorRecord> build() {
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
|
||||
// Add records for members.
|
||||
members.forEach((memberId, member) ->
|
||||
records.add(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member))
|
||||
);
|
||||
|
||||
if (!partitionMetadata.isEmpty()) {
|
||||
records.add(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
|
||||
partitionMetadata));
|
||||
}
|
||||
|
||||
// Add group epoch record.
|
||||
records.add(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch));
|
||||
|
||||
// Add target assignment records.
|
||||
targetAssignments.forEach((memberId, assignment) ->
|
||||
records.add(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, assignment))
|
||||
);
|
||||
|
||||
// Add topology record.
|
||||
if (topology != null) {
|
||||
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(
|
||||
groupId,
|
||||
new StreamsGroupTopologyValue()
|
||||
.setEpoch(topology.topologyEpoch())
|
||||
.setSubtopologies(topology.subtopologies().values().stream().sorted().toList()))
|
||||
);
|
||||
}
|
||||
|
||||
// Add target assignment epoch.
|
||||
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
|
||||
targetAssignmentEpoch));
|
||||
|
||||
// Add current assignment records for members.
|
||||
members.forEach((memberId, member) ->
|
||||
records.add(
|
||||
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, member))
|
||||
);
|
||||
|
||||
return records;
|
||||
}
|
||||
}
|
|
@ -84,8 +84,6 @@ import static org.mockito.ArgumentMatchers.eq;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.mockStatic;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class StreamsGroupTest {
|
||||
|
@ -858,53 +856,6 @@ public class StreamsGroupTest {
|
|||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateTransitionMetrics() {
|
||||
// Confirm metrics is not updated when a new StreamsGroup is created but only when the group transitions
|
||||
// its state.
|
||||
GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
|
||||
StreamsGroup streamsGroup = new StreamsGroup(
|
||||
LOG_CONTEXT,
|
||||
new SnapshotRegistry(new LogContext()),
|
||||
"group-id",
|
||||
metrics
|
||||
);
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
|
||||
verify(metrics, times(0)).onStreamsGroupStateTransition(null, StreamsGroup.StreamsGroupState.EMPTY);
|
||||
|
||||
StreamsGroupMember member = new StreamsGroupMember.Builder("member")
|
||||
.setMemberEpoch(1)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setState(MemberState.STABLE)
|
||||
.build();
|
||||
|
||||
streamsGroup.updateMember(member);
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, streamsGroup.state());
|
||||
verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.EMPTY, StreamsGroup.StreamsGroupState.NOT_READY);
|
||||
|
||||
streamsGroup.setTopology(new StreamsTopology(1, Collections.emptyMap()));
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
|
||||
verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.NOT_READY, StreamsGroup.StreamsGroupState.RECONCILING);
|
||||
|
||||
streamsGroup.setGroupEpoch(1);
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
|
||||
verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.RECONCILING, StreamsGroup.StreamsGroupState.ASSIGNING);
|
||||
|
||||
streamsGroup.setTargetAssignmentEpoch(1);
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state());
|
||||
verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.ASSIGNING, StreamsGroup.StreamsGroupState.STABLE);
|
||||
|
||||
streamsGroup.removeMember("member");
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
|
||||
verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.STABLE, StreamsGroup.StreamsGroupState.EMPTY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsInStatesCaseInsensitiveAndUnderscored() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
|
||||
|
@ -954,6 +905,36 @@ public class StreamsGroupTest {
|
|||
assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetTopologyUpdatesStateAndConfiguredTopologyWithPreviousCallToSetMetadata() {
|
||||
Uuid topicUuid = Uuid.randomUuid();
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
|
||||
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
|
||||
StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
|
||||
|
||||
assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
|
||||
|
||||
Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
|
||||
partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 1));
|
||||
|
||||
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
|
||||
streamsGroup.setPartitionMetadata(partitionMetadata);
|
||||
mocked.verify(() -> InternalTopicManager.configureTopics(any(), any(), any()), never());
|
||||
}
|
||||
|
||||
assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured topology should not be present");
|
||||
assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
|
||||
|
||||
StreamsTopology topology = new StreamsTopology(1, Collections.emptyMap());
|
||||
ConfiguredTopology topo = mock(ConfiguredTopology.class);
|
||||
when(topo.isReady()).thenReturn(true);
|
||||
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
|
||||
mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata))).thenReturn(topo);
|
||||
streamsGroup.setTopology(topology);
|
||||
mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
|
||||
Uuid topicUuid = Uuid.randomUuid();
|
||||
|
|
Loading…
Reference in New Issue