KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (#19802)

* Use metadata hash to replace subscription metadata.
* Remove `StreamsGroupPartitionMetadataKey` and
`StreamsGroupPartitionMetadataValue`.
* Check whether `configuredTopology` is empty. If it's, call
`InternalTopicManager.configureTopics` and set the result to the group.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-06-03 19:21:34 +08:00 committed by GitHub
parent df93571f50
commit 425f028556
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 379 additions and 786 deletions

View File

@ -109,8 +109,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -1299,13 +1297,6 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
);
break;
case STREAMS_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(StreamsGroupPartitionMetadataKey) key,
(StreamsGroupPartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case STREAMS_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(StreamsGroupMemberMetadataKey) key,

View File

@ -129,8 +129,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -260,7 +258,6 @@ import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecor
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@ -1894,43 +1891,42 @@ public class GroupMetadataManager {
StreamsTopology updatedTopology = maybeUpdateTopology(groupId, memberId, topology, group, records);
maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
// 3. Determine the partition metadata and any internal topics if needed.
// 3. Determine any internal topics if needed.
ConfiguredTopology updatedConfiguredTopology;
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> updatedPartitionMetadata;
boolean reconfigureTopology = group.topology().isEmpty();
if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
long metadataHash = group.metadataHash();
if (reconfigureTopology || group.configuredTopology().isEmpty() || group.hasMetadataExpired(currentTimeMs)) {
updatedPartitionMetadata = group.computePartitionMetadata(
metadataImage.topics(),
metadataHash = group.computeMetadataHash(
metadataImage,
topicHashCache,
updatedTopology
);
if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
log.info("[GroupId {}][MemberId {}] Computed new partition metadata: {}.",
groupId, memberId, updatedPartitionMetadata);
if (metadataHash != group.metadataHash()) {
log.info("[GroupId {}][MemberId {}] Computed new metadata hash: {}.",
groupId, memberId, metadataHash);
bumpGroupEpoch = true;
reconfigureTopology = true;
records.add(newStreamsGroupPartitionMetadataRecord(groupId, updatedPartitionMetadata));
group.setPartitionMetadata(updatedPartitionMetadata);
}
if (reconfigureTopology || group.configuredTopology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology);
updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, updatedTopology, updatedPartitionMetadata);
updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology, metadataImage.topics());
group.setConfiguredTopology(updatedConfiguredTopology);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
}
} else {
updatedConfiguredTopology = group.configuredTopology().get();
updatedPartitionMetadata = group.partitionMetadata();
}
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
@ -1946,7 +1942,7 @@ public class GroupMetadataManager {
groupEpoch,
updatedMember,
updatedConfiguredTopology,
updatedPartitionMetadata,
metadataImage,
records
);
targetAssignmentEpoch = groupEpoch;
@ -2111,7 +2107,7 @@ public class GroupMetadataManager {
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group);
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group, metadataImage);
endpointToPartitionsList.add(endpointToPartitions);
}
}
@ -3795,12 +3791,12 @@ public class GroupMetadataManager {
}
/**
* Updates the target assignment according to the updated member and subscription metadata.
* Updates the target assignment according to the updated member and metadata image.
*
* @param group The StreamsGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
@ -3809,7 +3805,7 @@ public class GroupMetadataManager {
int groupEpoch,
StreamsGroupMember updatedMember,
ConfiguredTopology configuredTopology,
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata,
MetadataImage metadataImage,
List<CoordinatorRecord> records
) {
TaskAssignor assignor = streamsGroupAssignor(group.groupId());
@ -3825,7 +3821,7 @@ public class GroupMetadataManager {
.withMembers(group.members())
.withTopology(configuredTopology)
.withStaticMembers(group.staticMembers())
.withPartitionMetadata(subscriptionMetadata)
.withMetadataImage(metadataImage)
.withTargetAssignment(group.targetAssignment())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
@ -5282,6 +5278,7 @@ public class GroupMetadataManager {
if (value != null) {
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
streamsGroup.setGroupEpoch(value.epoch());
streamsGroup.setMetadataHash(value.metadataHash());
} else {
StreamsGroup streamsGroup;
try {
@ -5304,38 +5301,6 @@ public class GroupMetadataManager {
}
/**
* Replays StreamsGroupPartitionMetadataKey/Value to update the hard state of
* the streams group. It updates the subscription metadata of the streams
* group.
*
* @param key A StreamsGroupPartitionMetadataKey key.
* @param value A StreamsGroupPartitionMetadataValue record.
*/
public void replay(
StreamsGroupPartitionMetadataKey key,
StreamsGroupPartitionMetadataValue value
) {
String groupId = key.groupId();
StreamsGroup streamsGroup;
try {
streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
// If the group does not exist, we can ignore the tombstone.
return;
}
if (value != null) {
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = new HashMap<>();
value.topics().forEach(topicMetadata -> {
partitionMetadata.put(topicMetadata.topicName(), org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata));
});
streamsGroup.setPartitionMetadata(partitionMetadata);
} else {
streamsGroup.setPartitionMetadata(Map.of());
}
}
/**
* Replays ShareGroupMemberMetadataKey/Value to update the hard state of
* the share group. It updates the subscription part of the member or

View File

@ -24,8 +24,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -98,58 +96,6 @@ public class StreamsCoordinatorRecordHelpers {
);
}
/**
* Creates a StreamsGroupPartitionMetadata record.
*
* @param groupId The streams group id.
* @param newPartitionMetadata The partition metadata.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
String groupId,
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata should not be null here");
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
newPartitionMetadata.forEach((topicName, topicMetadata) -> {
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
);
});
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
return CoordinatorRecord.record(
new StreamsGroupPartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
value,
(short) 0
)
);
}
/**
* Creates a StreamsGroupPartitionMetadata tombstone.
*
* @param groupId The streams group id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord(
String groupId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupPartitionMetadataKey()
.setGroupId(groupId)
);
}
public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId,
int newGroupEpoch,

View File

@ -29,15 +29,16 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
@ -152,6 +153,11 @@ public class StreamsGroup implements Group {
*/
private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
/**
* The metadata hash which is computed based on the all subscribed topics.
*/
protected final TimelineLong metadataHash;
/**
* The target assignment epoch. An assignment epoch smaller than the group epoch means that a new assignment is required. The assignment
* epoch is updated when a new assignment is installed.
@ -226,6 +232,7 @@ public class StreamsGroup implements Group {
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentActiveTaskToProcessId = new TimelineHashMap<>(snapshotRegistry, 0);
@ -280,7 +287,11 @@ public class StreamsGroup implements Group {
public void setTopology(StreamsTopology topology) {
this.topology.set(Optional.ofNullable(topology));
maybeUpdateConfiguredTopology();
maybeUpdateGroupState();
}
public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
maybeUpdateGroupState();
}
@ -582,54 +593,47 @@ public class StreamsGroup implements Group {
}
/**
* @return An immutable map of partition metadata for each topic that are inputs for this streams group.
* @return The metadata hash.
*/
public Map<String, TopicMetadata> partitionMetadata() {
return Collections.unmodifiableMap(partitionMetadata);
public long metadataHash() {
return metadataHash.get();
}
/**
* Updates the partition metadata. This replaces the previous one.
* Updates the metadata hash.
*
* @param partitionMetadata The new partition metadata.
* @param metadataHash The new metadata hash.
*/
public void setPartitionMetadata(
Map<String, TopicMetadata> partitionMetadata
) {
this.partitionMetadata.clear();
this.partitionMetadata.putAll(partitionMetadata);
maybeUpdateConfiguredTopology();
maybeUpdateGroupState();
public void setMetadataHash(long metadataHash) {
this.metadataHash.set(metadataHash);
}
/**
* Computes the partition metadata based on the current topology and the current topics image.
* Computes the metadata hash based on the current topology and the current metadata image.
*
* @param topicsImage The current metadata for all available topics.
* @param topology The current metadata for the Streams topology
* @return An immutable map of partition metadata for each topic that the Streams topology is using (besides non-repartition sink topics)
* @param metadataImage The current metadata image.
* @param topicHashCache The cache for the topic hashes.
* @param topology The current metadata for the Streams topology
* @return The metadata hash.
*/
public Map<String, TopicMetadata> computePartitionMetadata(
TopicsImage topicsImage,
public long computeMetadataHash(
MetadataImage metadataImage,
Map<String, Long> topicHashCache,
StreamsTopology topology
) {
Set<String> requiredTopicNames = topology.requiredTopics();
// Create the topic metadata for each subscribed topic.
Map<String, TopicMetadata> newPartitionMetadata = new HashMap<>(requiredTopicNames.size());
Map<String, Long> topicHash = new HashMap<>(requiredTopicNames.size());
requiredTopicNames.forEach(topicName -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage != null) {
newPartitionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size())
topicHash.put(
topicName,
topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage))
);
}
});
return Collections.unmodifiableMap(newPartitionMetadata);
return Utils.computeGroupHash(topicHash);
}
/**
@ -793,7 +797,6 @@ public class StreamsGroup implements Group {
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(), memberId))
);
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
}
@ -855,18 +858,6 @@ public class StreamsGroup implements Group {
state.set(newState);
}
private void maybeUpdateConfiguredTopology() {
if (topology.get().isPresent()) {
final StreamsTopology streamsTopology = topology.get().get();
log.info("[GroupId {}] Configuring the topology {}", groupId, streamsTopology);
this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext, streamsTopology, partitionMetadata)));
} else {
configuredTopology.set(Optional.empty());
}
}
/**
* Updates the tasks process IDs based on the old and the new member.
*

View File

@ -24,6 +24,7 @@ import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.image.MetadataImage;
import java.util.ArrayList;
import java.util.Collections;
@ -75,9 +76,9 @@ public class TargetAssignmentBuilder {
private Map<String, StreamsGroupMember> members = Map.of();
/**
* The partition metadata.
* The metadata image.
*/
private Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = Map.of();
private MetadataImage metadataImage = MetadataImage.EMPTY;
/**
* The existing target assignment.
@ -157,15 +158,15 @@ public class TargetAssignmentBuilder {
}
/**
* Adds the partition metadata to use.
* Adds the metadata image to use.
*
* @param partitionMetadata The partition metadata.
* @param metadataImage The metadata image.
* @return This object.
*/
public TargetAssignmentBuilder withPartitionMetadata(
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata
public TargetAssignmentBuilder withMetadataImage(
MetadataImage metadataImage
) {
this.partitionMetadata = partitionMetadata;
this.metadataImage = metadataImage;
return this;
}
@ -273,7 +274,7 @@ public class TargetAssignmentBuilder {
Collections.unmodifiableMap(memberSpecs),
assignmentConfigs
),
new TopologyMetadata(partitionMetadata, topology.subtopologies().get())
new TopologyMetadata(metadataImage, topology.subtopologies().get())
);
} else {
newGroupAssignment = new GroupAssignment(

View File

@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import java.util.Objects;
@ -46,11 +45,4 @@ public record TopicMetadata(Uuid id, String name, int numPartitions) {
throw new IllegalArgumentException("Number of partitions must be positive.");
}
}
public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions());
}
}

View File

@ -18,10 +18,11 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedMap;
@ -31,25 +32,22 @@ import java.util.stream.Stream;
* The topology metadata class is used by the {@link org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic and
* partition metadata for the topology that the streams group using.
*
* @param topicMetadata The topic Ids mapped to their corresponding {@link TopicMetadata} object, which contains topic and partition
* metadata.
* @param metadataImage The metadata image
* @param subtopologyMap The configured subtopologies
*/
public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
public record TopologyMetadata(MetadataImage metadataImage, SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
public TopologyMetadata {
topicMetadata = Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
metadataImage = Objects.requireNonNull(metadataImage);
subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
}
/**
* Map of topic names to topic metadata.
*
* @return The map of topic Ids to topic metadata.
* @return The metadata image in topology metadata.
*/
@Override
public Map<String, TopicMetadata> topicMetadata() {
return this.topicMetadata;
public MetadataImage metadataImage() {
return this.metadataImage;
}
/**
@ -90,7 +88,13 @@ public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, SortedM
return Stream.concat(
subtopology.sourceTopics().stream(),
subtopology.repartitionSourceTopics().keySet().stream()
).map(topic -> this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
).map(topic -> {
TopicImage topicImage = metadataImage.topics().getTopic(topic);
if (topicImage == null) {
throw new IllegalStateException("Topic " + topic + " not found in metadata image");
}
return topicImage.partitions().size();
}).max(Integer::compareTo).orElseThrow(
() -> new IllegalStateException("Subtopology does not contain any source topics")
);
}

View File

@ -31,6 +31,7 @@ import java.util.SortedMap;
*
* @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized
* the topology.
* @param metadataHash The metadata hash of the group.
* @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it
* specifies the number of tasks available for every subtopology. Undefined if topology configuration
* failed.
@ -41,6 +42,7 @@ import java.util.SortedMap;
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
long metadataHash,
Optional<SortedMap<String, ConfiguredSubtopology>> subtopologies,
Map<String, CreatableTopic> internalTopicsToBeCreated,
Optional<TopicConfigurationException> topicConfigurationException) {

View File

@ -20,7 +20,8 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import java.util.ArrayList;
import java.util.Collections;
@ -36,14 +37,15 @@ public class EndpointToPartitionsManager {
public static StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
final StreamsGroup streamsGroup) {
final StreamsGroup streamsGroup,
final MetadataImage metadataImage) {
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
Map<String, Set<Integer>> activeTasks = streamsGroupMember.assignedTasks().activeTasks();
Map<String, Set<Integer>> standbyTasks = streamsGroupMember.assignedTasks().standbyTasks();
endpointToPartitions.setUserEndpoint(responseEndpoint);
Map<String, ConfiguredSubtopology> configuredSubtopologies = streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
List<StreamsGroupHeartbeatResponseData.TopicPartition> activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, streamsGroup.partitionMetadata());
List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, streamsGroup.partitionMetadata());
List<StreamsGroupHeartbeatResponseData.TopicPartition> activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, metadataImage);
List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, metadataImage);
endpointToPartitions.setActivePartitions(activeTopicPartitions);
endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
return endpointToPartitions;
@ -51,7 +53,7 @@ public class EndpointToPartitionsManager {
private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions(final Map<String, Set<Integer>> tasks,
final Map<String, ConfiguredSubtopology> configuredSubtopologies,
final Map<String, TopicMetadata> groupTopicMetadata) {
final MetadataImage metadataImage) {
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionsForTasks = new ArrayList<>();
for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
String subtopologyId = taskEntry.getKey();
@ -60,7 +62,7 @@ public class EndpointToPartitionsManager {
Set<String> repartitionSourceTopics = configuredSubtopology.repartitionSourceTopics().keySet();
Set<String> allSourceTopic = new HashSet<>(sourceTopics);
allSourceTopic.addAll(repartitionSourceTopics);
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), allSourceTopic, groupTopicMetadata);
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), allSourceTopic, metadataImage);
topicPartitionsForTasks.addAll(topicPartitionList);
}
return topicPartitionsForTasks;
@ -68,9 +70,13 @@ public class EndpointToPartitionsManager {
private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionListForTask(final Set<Integer> taskSet,
final Set<String> topicNames,
final Map<String, TopicMetadata> groupTopicMetadata) {
final MetadataImage metadataImage) {
return topicNames.stream().map(topic -> {
int numPartitionsForTopic = groupTopicMetadata.get(topic).numPartitions();
TopicImage topicImage = metadataImage.topics().getTopic(topic);
if (topicImage == null) {
throw new IllegalStateException("Topic " + topic + " not found in metadata image");
}
int numPartitionsForTopic = topicImage.partitions().size();
StreamsGroupHeartbeatResponseData.TopicPartition tp = new StreamsGroupHeartbeatResponseData.TopicPartition();
tp.setTopic(topic);
List<Integer> tpPartitions = new ArrayList<>(taskSet);

View File

@ -22,7 +22,8 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.slf4j.Logger;
@ -46,17 +47,19 @@ import java.util.stream.Stream;
public class InternalTopicManager {
/**
* Configures the internal topics for the given topology. Given a topology and the topic metadata, this method determines the number of
* Configures the internal topics for the given topology. Given a topology and the topics image, this method determines the number of
* partitions for all internal topics and returns a {@link ConfiguredTopology} object.
*
* @param logContext The log context.
* @param topology The topology.
* @param topicMetadata The topic metadata.
* @param logContext The log context.
* @param metadataHash The metadata hash of the group.
* @param topology The topology.
* @param topicsImage The topics image.
* @return The configured topology.
*/
public static ConfiguredTopology configureTopics(LogContext logContext,
long metadataHash,
StreamsTopology topology,
Map<String, TopicMetadata> topicMetadata) {
TopicsImage topicsImage) {
final Logger log = logContext.logger(InternalTopicManager.class);
final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies = topology.subtopologies().values();
@ -70,10 +73,10 @@ public class InternalTopicManager {
try {
Optional<TopicConfigurationException> topicConfigurationException = Optional.empty();
throwOnMissingSourceTopics(topology, topicMetadata);
throwOnMissingSourceTopics(topology, topicsImage);
Map<String, Integer> decidedPartitionCountsForInternalTopics =
decidePartitionCounts(logContext, topology, topicMetadata, copartitionGroupsBySubtopology, log);
decidePartitionCounts(logContext, topology, topicsImage, copartitionGroupsBySubtopology, log);
final SortedMap<String, ConfiguredSubtopology> configuredSubtopologies =
subtopologies.stream()
@ -86,7 +89,7 @@ public class InternalTopicManager {
TreeMap::new
));
Map<String, CreatableTopic> internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topicMetadata);
Map<String, CreatableTopic> internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topology, topicsImage);
if (!internalTopicsToCreate.isEmpty()) {
topicConfigurationException = Optional.of(TopicConfigurationException.missingInternalTopics(
"Internal topics are missing: " + internalTopicsToCreate.keySet()
@ -99,6 +102,7 @@ public class InternalTopicManager {
return new ConfiguredTopology(
topology.topologyEpoch(),
metadataHash,
Optional.of(configuredSubtopologies),
internalTopicsToCreate,
topicConfigurationException
@ -109,6 +113,7 @@ public class InternalTopicManager {
topology.topologyEpoch(), e.toString());
return new ConfiguredTopology(
topology.topologyEpoch(),
metadataHash,
Optional.empty(),
Map.of(),
Optional.of(e)
@ -117,11 +122,11 @@ public class InternalTopicManager {
}
private static void throwOnMissingSourceTopics(final StreamsTopology topology,
final Map<String, TopicMetadata> topicMetadata) {
final TopicsImage topicsImage) {
TreeSet<String> sortedMissingTopics = new TreeSet<>();
for (StreamsGroupTopologyValue.Subtopology subtopology : topology.subtopologies().values()) {
for (String sourceTopic : subtopology.sourceTopics()) {
if (!topicMetadata.containsKey(sourceTopic)) {
if (topicsImage.getTopic(sourceTopic) == null) {
sortedMissingTopics.add(sourceTopic);
}
}
@ -134,12 +139,12 @@ public class InternalTopicManager {
private static Map<String, Integer> decidePartitionCounts(final LogContext logContext,
final StreamsTopology topology,
final Map<String, TopicMetadata> topicMetadata,
final TopicsImage topicsImage,
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger log) {
final Map<String, Integer> decidedPartitionCountsForInternalTopics = new HashMap<>();
final Function<String, OptionalInt> topicPartitionCountProvider =
topic -> getPartitionCount(topicMetadata, topic, decidedPartitionCountsForInternalTopics);
topic -> getPartitionCount(topicsImage, topic, decidedPartitionCountsForInternalTopics);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
logContext,
topology.subtopologies().values(),
@ -190,7 +195,8 @@ public class InternalTopicManager {
}
private static Map<String, CreatableTopic> missingInternalTopics(Map<String, ConfiguredSubtopology> subtopologyMap,
Map<String, TopicMetadata> topicMetadata) {
StreamsTopology topology,
TopicsImage topicsImage) {
final Map<String, CreatableTopic> topicsToCreate = new HashMap<>();
for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
@ -199,31 +205,34 @@ public class InternalTopicManager {
subtopology.stateChangelogTopics().values()
.forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x)));
}
for (Map.Entry<String, TopicMetadata> topic : topicMetadata.entrySet()) {
final TopicMetadata existingTopic = topic.getValue();
final CreatableTopic expectedTopic = topicsToCreate.remove(topic.getKey());
for (String topic : topology.requiredTopics()) {
TopicImage topicImage = topicsImage.getTopic(topic);
if (topicImage == null) {
continue;
}
final CreatableTopic expectedTopic = topicsToCreate.remove(topic);
if (expectedTopic != null) {
if (existingTopic.numPartitions() != expectedTopic.numPartitions()) {
throw TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + topic.getKey() + " has different"
+ " number of partitions: expected " + expectedTopic.numPartitions() + ", found " + existingTopic.numPartitions());
if (topicImage.partitions().size() != expectedTopic.numPartitions()) {
throw TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + topic + " has different"
+ " number of partitions: expected " + expectedTopic.numPartitions() + ", found " + topicImage.partitions().size());
}
}
}
return topicsToCreate;
}
private static OptionalInt getPartitionCount(Map<String, TopicMetadata> topicMetadata,
private static OptionalInt getPartitionCount(TopicsImage topicsImage,
String topic,
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
final TopicMetadata metadata = topicMetadata.get(topic);
if (metadata == null) {
final TopicImage topicImage = topicsImage.getTopic(topic);
if (topicImage == null) {
if (decidedPartitionCountsForInternalTopics.containsKey(topic)) {
return OptionalInt.of(decidedPartitionCountsForInternalTopics.get(topic));
} else {
return OptionalInt.empty();
}
} else {
return OptionalInt.of(metadata.numPartitions());
return OptionalInt.of(topicImage.partitions().size());
}
}

View File

@ -1,27 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes.
{
"apiKey": 18,
"type": "coordinator-key",
"name": "StreamsGroupPartitionMetadataKey",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0",
"about": "The group ID." }
]
}

View File

@ -1,34 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes.
{
"apiKey": 18,
"type": "coordinator-value",
"name": "StreamsGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
"about": "The list of topic metadata.", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid",
"about": "The topic ID." },
{ "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." }
]}
]
}

View File

@ -77,8 +77,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -1006,60 +1004,6 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager).replay(key, null);
}
@Test
public void testReplayStreamsGroupPartitionMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
StreamsGroupPartitionMetadataKey key = new StreamsGroupPartitionMetadataKey();
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
key,
new ApiMessageAndVersion(value, (short) 0)
));
verify(groupMetadataManager).replay(key, value);
}
@Test
public void testReplayStreamsGroupPartitionMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
StreamsGroupPartitionMetadataKey key = new StreamsGroupPartitionMetadataKey();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
key
));
verify(groupMetadataManager).replay(key, null);
}
@Test
public void testReplayStreamsGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);

View File

@ -125,6 +125,7 @@ import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -16030,7 +16031,6 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of())
)
.build();
@ -16066,12 +16066,17 @@ public class GroupMetadataManagerTest {
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@ -16127,11 +16132,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16158,12 +16159,14 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@ -16210,12 +16213,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
)
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16240,12 +16240,14 @@ public class GroupMetadataManagerTest {
)
);
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@ -16297,10 +16299,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16326,13 +16327,14 @@ public class GroupMetadataManagerTest {
)
);
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withMetadataImage(metadataImage)
.build();
// Member joins the streams group.
@ -16379,11 +16381,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16414,13 +16415,14 @@ public class GroupMetadataManagerTest {
)
);
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(
new StreamsGroupBuilder(groupId, 10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@ -16474,11 +16476,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16499,12 +16500,15 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
long groupMetadataHash = computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage)));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@ -16526,9 +16530,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
))
.withMetadataHash(groupMetadataHash)
)
.build();
@ -16593,12 +16595,14 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@ -16612,9 +16616,7 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
))
.withMetadataHash(computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage))))
)
.build();
@ -16688,13 +16690,19 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@ -16707,10 +16715,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
.withMetadataHash(groupMetadataHash)
)
.build();
@ -16759,7 +16764,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16788,13 +16793,24 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage newMetadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, changedPartitionCount)
.build();
MetadataImage oldMetadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
long oldGroupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, oldMetadataImage),
barTopicName, computeTopicHash(barTopicName, oldMetadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, changedPartitionCount)
.build())
.withMetadataImage(newMetadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@ -16807,10 +16823,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
.withMetadataHash(oldGroupMetadataHash)
)
.build();
@ -16857,11 +16870,10 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, changedPartitionCount)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
barTopicName, computeTopicHash(barTopicName, newMetadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16891,6 +16903,11 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
@ -16922,10 +16939,10 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
.withMetadataHash(computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
)))
)
.build();
@ -17128,13 +17145,19 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setMemberEpoch(10)
@ -17158,10 +17181,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
))
.withMetadataHash(groupMetadataHash)
)
.build();
@ -17582,9 +17602,19 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
.build();
@ -17593,16 +17623,16 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash));
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)
));
context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
.setConfiguredTopology(InternalTopicManager.configureTopics(
new LogContext(),
groupMetadataHash,
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
metadataImage.topics()));
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, context.streamsGroupState(groupId));
@ -17688,12 +17718,14 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setMemberEpoch(10)
@ -17705,11 +17737,11 @@ public class GroupMetadataManagerTest {
.withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
.withTargetAssignmentEpoch(10)
.withPartitionMetadata(
.withMetadataHash(computeGroupHash(Map.of(
// foo only has 3 tasks stored in the metadata but foo has
// 6 partitions the metadata image.
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 3))
))
fooTopicName, computeTopicHash(fooTopicName, new MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
))))
.build();
// The metadata refresh flag should be true.
@ -17753,10 +17785,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -17784,12 +17815,14 @@ public class GroupMetadataManagerTest {
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withMetadataImage(metadataImage)
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId)
.setMemberEpoch(10)
@ -17801,11 +17834,11 @@ public class GroupMetadataManagerTest {
.withTargetAssignment(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
.withTargetAssignmentEpoch(10)
.withPartitionMetadata(
.withMetadataHash(computeGroupHash(Map.of(
// foo only has 3 partitions stored in the metadata but foo has
// 6 partitions the metadata image.
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 3))
))
fooTopicName, computeTopicHash(fooTopicName, new MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
))))
.build();
// The metadata refresh flag should be true.
@ -17870,10 +17903,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -19156,45 +19188,6 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
}
@Test
public void testReplayStreamsGroupPartitionMetadata() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> metadata = Map.of(
"bar",
new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "bar", 10)
);
// The group is created if it does not exist.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo", metadata));
assertEquals(metadata, context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
}
@Test
public void testReplayStreamsGroupPartitionMetadataTombstoneNotExisting() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
// The group may not exist at all. Replaying the StreamsGroupPartitionMetadata tombstone
// should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.streamsGroup("foo"));
}
@Test
public void testReplayStreamsGroupPartitionMetadataTombstoneExisting() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder("foo", 10).withPartitionMetadata(
Map.of("topic1", new org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), "topic1", 10))
))
.build();
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").partitionMetadata().isEmpty());
}
@Test
public void testReplayStreamsGroupTargetAssignmentMember() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()

View File

@ -97,8 +97,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -118,6 +116,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -558,7 +557,18 @@ public class GroupMetadataManagerTestContext {
consumerGroupBuilders.forEach(builder -> builder.build().forEach(context::replay));
shareGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
streamsGroupBuilders.forEach(builder -> builder.build().forEach(context::replay));
streamsGroupBuilders.forEach(builder -> {
builder.build().forEach(context::replay);
StreamsGroup group = context.groupMetadataManager.getStreamsGroupOrThrow(builder.groupId());
if (group.topology().isPresent()) {
group.setConfiguredTopology(InternalTopicManager.configureTopics(
new LogContext(),
0,
group.topology().get(),
metadataImage.topics())
);
}
});
context.commit();
@ -1744,13 +1754,6 @@ public class GroupMetadataManagerTestContext {
);
break;
case STREAMS_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(StreamsGroupPartitionMetadataKey) key,
(StreamsGroupPartitionMetadataValue) messageOrNull(value)
);
break;
case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
groupMetadataManager.replay(
(StreamsGroupTargetAssignmentMemberKey) key,

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
@ -26,8 +25,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
@ -252,47 +249,6 @@ class StreamsCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID, MEMBER_ID));
}
@Test
public void testNewStreamsGroupPartitionMetadataRecord() {
Uuid uuid1 = Uuid.randomUuid();
Uuid uuid2 = Uuid.randomUuid();
Map<String, TopicMetadata> newPartitionMetadata = Map.of(
TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
);
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid1)
.setTopicName(TOPIC_1)
.setNumPartitions(1)
);
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid2)
.setTopicName(TOPIC_2)
.setNumPartitions(2)
);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupPartitionMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(value, (short) 0)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID, newPartitionMetadata));
}
@Test
public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupPartitionMetadataKey()
.setGroupId(GROUP_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
}
@Test
public void testNewStreamsGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
@ -717,27 +673,6 @@ class StreamsCoordinatorRecordHelpersTest {
assertEquals("memberId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null, Map.of()));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId", null));
assertEquals("newPartitionMetadata should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->

View File

@ -34,7 +34,7 @@ public class StreamsGroupBuilder {
private StreamsTopology topology;
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
private Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
private long metadataHash = 0L;
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@ -48,8 +48,8 @@ public class StreamsGroupBuilder {
return this;
}
public StreamsGroupBuilder withPartitionMetadata(Map<String, TopicMetadata> partitionMetadata) {
this.partitionMetadata = partitionMetadata;
public StreamsGroupBuilder withMetadataHash(long metadataHash) {
this.metadataHash = metadataHash;
return this;
}
@ -77,15 +77,9 @@ public class StreamsGroupBuilder {
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member))
);
if (!partitionMetadata.isEmpty()) {
records.add(
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
partitionMetadata));
}
// Add group epoch record.
records.add(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->
@ -115,4 +109,8 @@ public class StreamsGroupBuilder {
return records;
}
public String groupId() {
return groupId;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@ -37,7 +38,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAss
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
@ -47,18 +47,15 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -66,6 +63,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
@ -79,11 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
public class StreamsGroupTest {
@ -505,6 +499,7 @@ public class StreamsGroupTest {
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, streamsGroup.state());
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
assertEquals(MemberState.STABLE, member1.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
@ -702,6 +697,7 @@ public class StreamsGroupTest {
);
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
@ -767,6 +763,7 @@ public class StreamsGroupTest {
assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
@ -811,6 +808,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
@ -907,109 +905,7 @@ public class StreamsGroupTest {
}
@Test
public void testSetTopologyUpdatesStateAndConfiguredTopology() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
StreamsTopology topology = new StreamsTopology(1, Map.of());
ConfiguredTopology topo = mock(ConfiguredTopology.class);
when(topo.isReady()).thenReturn(true);
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(Map.of()))).thenReturn(topo);
streamsGroup.setTopology(topology);
mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(Map.of())));
}
Optional<ConfiguredTopology> configuredTopology = streamsGroup.configuredTopology();
assertTrue(configuredTopology.isPresent(), "Configured topology should be present");
assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
.build());
assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
}
@Test
public void testSetTopologyUpdatesStateAndConfiguredTopologyWithPreviousCallToSetMetadata() {
Uuid topicUuid = Uuid.randomUuid();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 1));
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
streamsGroup.setPartitionMetadata(partitionMetadata);
mocked.verify(() -> InternalTopicManager.configureTopics(any(), any(), any()), never());
}
assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured topology should not be present");
assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
StreamsTopology topology = new StreamsTopology(1, Map.of());
ConfiguredTopology topo = mock(ConfiguredTopology.class);
when(topo.isReady()).thenReturn(true);
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata))).thenReturn(topo);
streamsGroup.setTopology(topology);
mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata)));
}
}
@Test
public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
Uuid topicUuid = Uuid.randomUuid();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 1));
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
streamsGroup.setPartitionMetadata(partitionMetadata);
mocked.verify(() -> InternalTopicManager.configureTopics(any(), any(), any()), never());
}
assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured topology should not be present");
assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
StreamsTopology topology = new StreamsTopology(1, Map.of());
streamsGroup.setTopology(topology);
ConfiguredTopology topo = mock(ConfiguredTopology.class);
when(topo.isReady()).thenReturn(true);
try (MockedStatic<InternalTopicManager> mocked = mockStatic(InternalTopicManager.class)) {
mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata))).thenReturn(topo);
streamsGroup.setPartitionMetadata(partitionMetadata);
mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata)));
}
Optional<ConfiguredTopology> configuredTopology = streamsGroup.configuredTopology();
assertTrue(configuredTopology.isPresent(), "Configured topology should be present");
assertEquals(topo, configuredTopology.get());
assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
.build());
assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
}
@Test
public void testComputePartitionMetadata() {
public void testComputeMetadataHash() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
StreamsGroup streamsGroup = new StreamsGroup(
LOG_CONTEXT,
@ -1017,24 +913,17 @@ public class StreamsGroupTest {
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
);
TopicsImage topicsImage = mock(TopicsImage.class);
TopicImage topicImage = mock(TopicImage.class);
when(topicImage.id()).thenReturn(Uuid.randomUuid());
when(topicImage.name()).thenReturn("topic1");
when(topicImage.partitions()).thenReturn(Collections.singletonMap(0, null));
when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "topic1", 1)
.build();
StreamsTopology topology = mock(StreamsTopology.class);
when(topology.requiredTopics()).thenReturn(Set.of("topic1"));
Map<String, TopicMetadata> partitionMetadata = streamsGroup.computePartitionMetadata(topicsImage, topology);
assertEquals(1, partitionMetadata.size());
assertTrue(partitionMetadata.containsKey("topic1"));
TopicMetadata topicMetadata = partitionMetadata.get("topic1");
assertNotNull(topicMetadata);
assertEquals(topicImage.id(), topicMetadata.id());
assertEquals("topic1", topicMetadata.name());
assertEquals(1, topicMetadata.numPartitions());
long metadataHash = streamsGroup.computeMetadataHash(metadataImage, new HashMap<>(), topology);
// The metadata hash means no topic.
assertNotEquals(0, metadataHash);
}
@Test
@ -1053,7 +942,7 @@ public class StreamsGroupTest {
streamsGroup.createGroupTombstoneRecords(records);
assertEquals(7, records.size());
assertEquals(6, records.size());
for (CoordinatorRecord record : records) {
assertNotNull(record.key());
assertNull(record.value());
@ -1061,7 +950,6 @@ public class StreamsGroupTest {
final Set<ApiMessage> keys = records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
assertTrue(keys.contains(new StreamsGroupMetadataKey().setGroupId("test-group")));
assertTrue(keys.contains(new StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
assertTrue(keys.contains(new StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
assertTrue(keys.contains(new StreamsGroupTopologyKey().setGroupId("test-group")));
assertTrue(keys.contains(new StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
assertTrue(keys.contains(new StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
@ -1079,28 +967,26 @@ public class StreamsGroupTest {
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
streamsGroup.setTopology(
new StreamsTopology(1,
Map.of("test-subtopology",
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("test-subtopology")
.setSourceTopics(List.of("test-topic1"))
.setRepartitionSourceTopics(List.of(new StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
.setRepartitionSinkTopics(List.of("test-topic2"))
)
)
);
StreamsTopology topology = new StreamsTopology(1,
Map.of("test-subtopology",
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("test-subtopology")
.setSourceTopics(List.of("test-topic1"))
.setRepartitionSourceTopics(List.of(new StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
.setRepartitionSinkTopics(List.of("test-topic2"))
));
streamsGroup.setTopology(topology);
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
streamsGroup.setPartitionMetadata(
Map.of(
"test-topic1", new TopicMetadata(Uuid.randomUuid(), "test-topic1", 1),
"test-topic2", new TopicMetadata(Uuid.randomUuid(), "test-topic2", 1)
)
);
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "test-topic1", 1)
.addTopic(Uuid.randomUuid(), "test-topic2", 1)
.build();
streamsGroup.setConfiguredTopology(InternalTopicManager.configureTopics(logContext, 0, topology, metadataImage.topics()));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));

View File

@ -28,6 +28,7 @@ import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -659,7 +660,7 @@ public class TargetAssignmentBuilderTest {
private final int groupEpoch;
private final TaskAssignor assignor = mock(TaskAssignor.class);
private final SortedMap<String, ConfiguredSubtopology> subtopologies = new TreeMap<>();
private final ConfiguredTopology topology = new ConfiguredTopology(0, Optional.of(subtopologies), new HashMap<>(),
private final ConfiguredTopology topology = new ConfiguredTopology(0, 0, Optional.of(subtopologies), new HashMap<>(),
Optional.empty());
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata = new HashMap<>();
@ -711,11 +712,6 @@ public class TargetAssignmentBuilderTest {
) {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(
topicId,
topicName,
numTasks
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks);
subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));
@ -805,8 +801,10 @@ public class TargetAssignmentBuilderTest {
}
});
MetadataImage metadataImage = topicsImageBuilder.build();
// Prepare the expected topology metadata.
TopologyMetadata topologyMetadata = new TopologyMetadata(subscriptionMetadata, subtopologies);
TopologyMetadata topologyMetadata = new TopologyMetadata(metadataImage, subtopologies);
// Prepare the expected assignment spec.
GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new HashMap<>());
@ -822,7 +820,7 @@ public class TargetAssignmentBuilderTest {
.withMembers(members)
.withTopology(topology)
.withStaticMembers(staticMembers)
.withPartitionMetadata(subscriptionMetadata)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignment);
// Add the updated members or delete the deleted members.

View File

@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.junit.jupiter.api.Test;
@ -72,18 +71,4 @@ public class TopicMetadataTest {
new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
assertEquals("Number of partitions must be positive.", exception.getMessage());
}
@Test
public void testFromRecord() {
StreamsGroupPartitionMetadataValue.TopicMetadata record = new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(Uuid.randomUuid())
.setTopicName("test-topic")
.setNumPartitions(3);
TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
assertEquals(record.topicId(), topicMetadata.id());
assertEquals(record.topicName(), topicMetadata.name());
assertEquals(record.numPartitions(), topicMetadata.numPartitions());
}
}

View File

@ -16,13 +16,15 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -40,20 +42,23 @@ import static org.mockito.Mockito.when;
class TopologyMetadataTest {
private Map<String, TopicMetadata> topicMetadata;
private MetadataImage metadataImage;
private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
private TopologyMetadata topologyMetadata;
@BeforeEach
void setUp() {
topicMetadata = new HashMap<>();
metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "source_topic", 3)
.addTopic(Uuid.randomUuid(), "repartition_source_topic", 4)
.build();
subtopologyMap = new TreeMap<>();
topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
topologyMetadata = new TopologyMetadata(metadataImage, subtopologyMap);
}
@Test
void testTopicMetadata() {
assertEquals(topicMetadata, topologyMetadata.topicMetadata());
void testMetadataImage() {
assertEquals(metadataImage, topologyMetadata.metadataImage());
}
@Test
@ -83,13 +88,6 @@ class TopologyMetadataTest {
when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic", internalTopic));
TopicMetadata topicMeta1 = mock(TopicMetadata.class);
TopicMetadata topicMeta2 = mock(TopicMetadata.class);
topicMetadata.put("source_topic", topicMeta1);
topicMetadata.put("repartition_source_topic", topicMeta2);
when(topicMeta1.numPartitions()).thenReturn(3);
when(topicMeta2.numPartitions()).thenReturn(4);
assertEquals(4, topologyMetadata.maxNumInputPartitions("subtopology1"));
}

View File

@ -41,6 +41,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullSubtopologies() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
0,
null,
Map.of(),
@ -53,6 +54,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullInternalTopicsToBeCreated() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
0,
Optional.of(new TreeMap<>()),
null,
@ -65,6 +67,7 @@ public class ConfiguredTopologyTest {
public void testConstructorWithNullTopicConfigurationException() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
0,
Optional.empty(),
Map.of(),
@ -78,6 +81,7 @@ public class ConfiguredTopologyTest {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
-1,
0,
Optional.of(new TreeMap<>()),
Map.of(),
Optional.empty()
@ -90,6 +94,7 @@ public class ConfiguredTopologyTest {
final IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
1,
0,
Optional.empty(),
Map.of(),
Optional.empty()
@ -101,11 +106,11 @@ public class ConfiguredTopologyTest {
@Test
public void testIsReady() {
ConfiguredTopology readyTopology = new ConfiguredTopology(
1, Optional.of(new TreeMap<>()), new HashMap<>(), Optional.empty());
1, 0, Optional.of(new TreeMap<>()), new HashMap<>(), Optional.empty());
assertTrue(readyTopology.isReady());
ConfiguredTopology notReadyTopology = new ConfiguredTopology(
1, Optional.empty(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
1, 0, Optional.empty(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
assertFalse(notReadyTopology.isReady());
}
@ -120,7 +125,7 @@ public class ConfiguredTopologyTest {
Map<String, CreatableTopic> internalTopicsToBeCreated = new HashMap<>();
Optional<TopicConfigurationException> topicConfigurationException = Optional.empty();
ConfiguredTopology configuredTopology = new ConfiguredTopology(
topologyEpoch, Optional.of(subtopologies), internalTopicsToBeCreated, topicConfigurationException);
topologyEpoch, 0, Optional.of(subtopologies), internalTopicsToBeCreated, topicConfigurationException);
StreamsGroupDescribeResponseData.Topology topology = configuredTopology.asStreamsGroupDescribeTopology();

View File

@ -19,10 +19,11 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -79,16 +80,16 @@ class EndpointToPartitionsManagerTest {
@Test
void testEndpointToPartitionsWithStandbyTaskAssignments() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", 3));
topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", 3));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "Topic-A", 3)
.addTopic(Uuid.randomUuid(), "Topic-B", 3)
.build();
activeTasks.put("0", Set.of(0, 1, 2));
standbyTasks.put("1", Set.of(0, 1, 2));
tasksTuple = new TasksTuple(activeTasks, standbyTasks, Collections.emptyMap());
when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap = new TreeMap<>();
configuredSubtopologyMap.put("0", configuredSubtopologyOne);
@ -96,7 +97,7 @@ class EndpointToPartitionsManagerTest {
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup);
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup, metadataImage);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(1, result.activePartitions().size());
@ -123,20 +124,20 @@ class EndpointToPartitionsManagerTest {
List<Integer> topicBExpectedPartitions,
String testName
) {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", topicAPartitions));
topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", topicBPartitions));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "Topic-A", topicAPartitions)
.addTopic(Uuid.randomUuid(), "Topic-B", topicBPartitions)
.build();
configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", "Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
when(streamsGroupMember.assignedTasks()).thenReturn(new TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = new TreeMap<>();
configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
StreamsGroupHeartbeatResponseData.EndpointToPartitions result = EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup);
StreamsGroupHeartbeatResponseData.EndpointToPartitions result = EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup, metadataImage);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(2, result.activePartitions().size());

View File

@ -22,14 +22,14 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -54,12 +54,13 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
.build();
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
final ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
final ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, metadataImage.topics());
assertEquals(Optional.empty(), configuredTopology.subtopologies());
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@ -69,14 +70,14 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopics() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2));
topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
.addTopic(Uuid.randomUuid(), SOURCE_TOPIC_2, 2)
.addTopic(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2)
.build();
StreamsTopology topology = makeTestTopology();
ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), 0, topology, metadataImage.topics());
final Map<String, CreatableTopic> internalTopicsToBeCreated = configuredTopology.internalTopicsToBeCreated();
assertEquals(2, internalTopicsToBeCreated.size());