Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread

This commit is contained in:
Kirk True 2025-10-06 08:53:26 -07:00
commit dcbe761869
14 changed files with 1429 additions and 127 deletions

View File

@ -233,13 +233,13 @@ public class MirrorMaker {
private void addHerder(SourceAndTarget sourceAndTarget) {
log.info("creating herder for {}", sourceAndTarget.toString());
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String encodedSource = encodePath(sourceAndTarget.source());
String encodedTarget = encodePath(sourceAndTarget.target());
List<String> restNamespace = List.of(encodedSource, encodedTarget);
String workerId = generateWorkerId(sourceAndTarget);
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = distributedConfig.kafkaClusterId();
String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
// Create the admin client to be shared by all backing stores for this herder

View File

@ -114,14 +114,15 @@ public abstract class AbstractConnectCli<H extends Herder, T extends WorkerConfi
log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs();
T config = createConfig(workerProps);
log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
T config = createConfig(workerProps);
log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
RestClient restClient = new RestClient(config);

View File

@ -20,7 +20,6 @@ import java.lang.{Long => JLong}
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList}
import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.server._
import kafka.server.share.DelayedShareFetch
@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.log.remote.TopicPartitionLog
@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition,
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val stateChangeLogger = new StateChangeLogger(localBrokerId)
private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import com.typesafe.scalalogging.Logger
import kafka.utils.Logging
object StateChangeLogger {
private val logger = Logger("state.change.logger")
}
/**
* Simple class that sets `logIdent` appropriately depending on whether the state change logger is being used in the
* context of the KafkaController or not (e.g. ReplicaManager and MetadataCache log to the state change logger
* irrespective of whether the broker is the Controller).
*/
class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerEpoch: Option[Int]) extends Logging {
if (controllerEpoch.isDefined && !inControllerContext)
throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true")
override lazy val logger: Logger = StateChangeLogger.logger
locally {
val prefix = if (inControllerContext) "Controller" else "Broker"
val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch")
logIdent = s"[$prefix id=$brokerId$epochEntry] "
}
}

View File

@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener}
import kafka.controller.StateChangeLogger
import kafka.log.LogManager
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition}
@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
@volatile private var isInControlledShutdown = false
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
protected val stateChangeLogger = new StateChangeLogger(localBrokerId)
private var logDirFailureHandler: LogDirFailureHandler = _
@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig,
hasCustomErrorMessage = customException.isDefined
)
}
// In non-transaction paths, errorResults is typically empty, so we can
// In non-transaction paths, errorResults is typically empty, so we can
// directly use entriesPerPartition instead of creating a new filtered collection
val entriesWithoutErrorsPerPartition =
val entriesWithoutErrorsPerPartition =
if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }
else entriesPerPartition
@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig,
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
val (task, result) = processRemoteFetch(remoteFetchInfo)
remoteFetchTasks.put(topicIdPartition, task)
remoteFetchResults.put(topicIdPartition, result)
}
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)

View File

@ -2249,18 +2249,13 @@ public class GroupMetadataManager {
.setClassicMemberMetadata(null)
.build();
// If the group is newly created, we must ensure that it moves away from
// epoch 0 and that it is fully initialized.
boolean bumpGroupEpoch = group.groupEpoch() == 0;
bumpGroupEpoch |= hasMemberSubscriptionChanged(
boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
bumpGroupEpoch |= maybeUpdateRegularExpressions(
UpdateRegularExpressionsResult updateRegularExpressionsResult = maybeUpdateRegularExpressions(
context,
group,
member,
@ -2268,9 +2263,24 @@ public class GroupMetadataManager {
records
);
// The subscription has changed when either the subscribed topic names or subscribed topic
// regex has changed.
boolean hasSubscriptionChanged = subscribedTopicNamesChanged || updateRegularExpressionsResult.regexUpdated();
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
boolean bumpGroupEpoch =
// If the group is newly created, we must ensure that it moves away from
// epoch 0 and that it is fully initialized.
groupEpoch == 0 ||
// Bumping the group epoch signals that the target assignment should be updated. We bump
// the group epoch when the member has changed its subscribed topic names or the member
// has changed its subscribed topic regex to a regex that is already resolved. We avoid
// bumping the group epoch when the new subscribed topic regex has not been resolved
// yet, since we will have to update the target assignment again later.
subscribedTopicNamesChanged ||
updateRegularExpressionsResult == UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
@ -2315,6 +2325,9 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription has changed.
hasSubscriptionChanged,
ownedTopicPartitions,
records
);
@ -2468,6 +2481,8 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
group.assignmentEpoch(),
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
group.resolvedRegularExpressions(),
bumpGroupEpoch,
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
records
);
@ -2511,6 +2526,9 @@ public class GroupMetadataManager {
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription has changed.
bumpGroupEpoch,
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
records
);
@ -2669,6 +2687,8 @@ public class GroupMetadataManager {
updatedMember,
targetAssignmentEpoch,
targetAssignment,
// Force consistency with the subscription when the subscription has changed.
bumpGroupEpoch,
records
);
@ -3108,6 +3128,16 @@ public class GroupMetadataManager {
return value != null && !value.isEmpty();
}
private enum UpdateRegularExpressionsResult {
NO_CHANGE,
REGEX_UPDATED,
REGEX_UPDATED_AND_RESOLVED;
public boolean regexUpdated() {
return this == REGEX_UPDATED || this == REGEX_UPDATED_AND_RESOLVED;
}
}
/**
* Check whether the member has updated its subscribed topic regular expression and
* may trigger the resolution/the refresh of all the regular expressions in the
@ -3119,9 +3149,9 @@ public class GroupMetadataManager {
* @param member The old member.
* @param updatedMember The new member.
* @param records The records accumulator.
* @return Whether a rebalance must be triggered.
* @return The result of the update.
*/
private boolean maybeUpdateRegularExpressions(
private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
AuthorizableRequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
@ -3134,14 +3164,17 @@ public class GroupMetadataManager {
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
boolean bumpGroupEpoch = false;
boolean requireRefresh = false;
UpdateRegularExpressionsResult updateRegularExpressionsResult = UpdateRegularExpressionsResult.NO_CHANGE;
// Check whether the member has changed its subscribed regex.
if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) {
boolean subscribedTopicRegexChanged = !Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex);
if (subscribedTopicRegexChanged) {
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, newSubscribedTopicRegex);
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED;
if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
// If the member was the last one subscribed to the regex, we delete the
// resolved regular expression.
@ -3160,7 +3193,9 @@ public class GroupMetadataManager {
} else {
// If the new regex is already resolved, we trigger a rebalance
// by bumping the group epoch.
bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
if (group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
}
}
}
}
@ -3176,20 +3211,20 @@ public class GroupMetadataManager {
// 0. The group is subscribed to regular expressions. We also take the one
// that the current may have just introduced.
if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) {
return bumpGroupEpoch;
return updateRegularExpressionsResult;
}
// 1. There is no ongoing refresh for the group.
String key = group.groupId() + "-regex";
if (executor.isScheduled(key)) {
return bumpGroupEpoch;
return updateRegularExpressionsResult;
}
// 2. The last refresh is older than 10s. If the group does not have any regular
// expressions but the current member just brought a new one, we should continue.
long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs();
if (currentTimeMs <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
return bumpGroupEpoch;
return updateRegularExpressionsResult;
}
// 3.1 The group has unresolved regular expressions.
@ -3218,7 +3253,7 @@ public class GroupMetadataManager {
);
}
return bumpGroupEpoch;
return updateRegularExpressionsResult;
}
/**
@ -3492,16 +3527,18 @@ public class GroupMetadataManager {
/**
* Reconciles the current assignment of the member towards the target assignment if needed.
*
* @param groupId The group id.
* @param member The member to reconcile.
* @param currentPartitionEpoch The function returning the current epoch of
* a given partition.
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @param ownedTopicPartitions The list of partitions owned by the member. This
* is reported in the ConsumerGroupHeartbeat API and
* it could be null if not provided.
* @param records The list to accumulate any new records.
* @param groupId The group id.
* @param member The member to reconcile.
* @param currentPartitionEpoch The function returning the current epoch of
* a given partition.
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @param resolvedRegularExpressions The resolved regular expressions.
* @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat.
* @param ownedTopicPartitions The list of partitions owned by the member. This
* is reported in the ConsumerGroupHeartbeat API and
* it could be null if not provided.
* @param records The list to accumulate any new records.
* @return The received member if no changes have been made; or a new
* member containing the new assignment.
*/
@ -3511,15 +3548,20 @@ public class GroupMetadataManager {
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
int targetAssignmentEpoch,
Assignment targetAssignment,
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
boolean hasSubscriptionChanged,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
List<CoordinatorRecord> records
) {
if (member.isReconciledTo(targetAssignmentEpoch)) {
if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) {
return member;
}
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
.withHasSubscriptionChanged(hasSubscriptionChanged)
.withResolvedRegularExpressions(resolvedRegularExpressions)
.withCurrentPartitionEpoch(currentPartitionEpoch)
.withOwnedTopicPartitions(ownedTopicPartitions)
.build();
@ -3556,11 +3598,12 @@ public class GroupMetadataManager {
/**
* Reconciles the current assignment of the member towards the target assignment if needed.
*
* @param groupId The group id.
* @param member The member to reconcile.
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @param records The list to accumulate any new records.
* @param groupId The group id.
* @param member The member to reconcile.
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat.
* @param records The list to accumulate any new records.
* @return The received member if no changes have been made; or a new
* member containing the new assignment.
*/
@ -3569,14 +3612,17 @@ public class GroupMetadataManager {
ShareGroupMember member,
int targetAssignmentEpoch,
Assignment targetAssignment,
boolean hasSubscriptionChanged,
List<CoordinatorRecord> records
) {
if (member.isReconciledTo(targetAssignmentEpoch)) {
if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) {
return member;
}
ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
.withHasSubscriptionChanged(hasSubscriptionChanged)
.build();
if (!updatedMember.equals(member)) {

View File

@ -19,8 +19,11 @@ package org.apache.kafka.coordinator.group.modern.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.UnionSet;
import java.util.HashMap;
import java.util.HashSet;
@ -41,6 +44,11 @@ public class CurrentAssignmentBuilder {
*/
private final ConsumerGroupMember member;
/**
* The metadata image.
*/
private CoordinatorMetadataImage metadataImage = CoordinatorMetadataImage.EMPTY;
/**
* The target assignment epoch.
*/
@ -51,6 +59,16 @@ public class CurrentAssignmentBuilder {
*/
private Assignment targetAssignment;
/**
* Whether the member has changed its subscription on the current heartbeat.
*/
private boolean hasSubscriptionChanged;
/**
* The resolved regular expressions.
*/
private Map<String, ResolvedRegularExpression> resolvedRegularExpressions = Map.of();
/**
* A function which returns the current epoch of a topic-partition or -1 if the
* topic-partition is not assigned. The current epoch is the epoch of the current owner.
@ -73,6 +91,19 @@ public class CurrentAssignmentBuilder {
this.member = Objects.requireNonNull(member);
}
/**
* Sets the metadata image.
*
* @param metadataImage The metadata image.
* @return This object.
*/
public CurrentAssignmentBuilder withMetadataImage(
CoordinatorMetadataImage metadataImage
) {
this.metadataImage = metadataImage;
return this;
}
/**
* Sets the target assignment epoch and the target assignment that the
* consumer group member must be reconciled to.
@ -90,6 +121,32 @@ public class CurrentAssignmentBuilder {
return this;
}
/**
* Sets whether the member has changed its subscription on the current heartbeat.
*
* @param hasSubscriptionChanged If true, always removes unsubscribed topics from the current assignment.
* @return This object.
*/
public CurrentAssignmentBuilder withHasSubscriptionChanged(
boolean hasSubscriptionChanged
) {
this.hasSubscriptionChanged = hasSubscriptionChanged;
return this;
}
/**
* Sets the resolved regular expressions.
*
* @param resolvedRegularExpressions The resolved regular expressions.
* @return This object.
*/
public CurrentAssignmentBuilder withResolvedRegularExpressions(
Map<String, ResolvedRegularExpression> resolvedRegularExpressions
) {
this.resolvedRegularExpressions = resolvedRegularExpressions;
return this;
}
/**
* Sets a BiFunction which allows to retrieve the current epoch of a
* partition. This is used by the state machine to determine if a
@ -132,12 +189,15 @@ public class CurrentAssignmentBuilder {
case STABLE:
// When the member is in the STABLE state, we verify if a newer
// epoch (or target assignment) is available. If it is, we can
// reconcile the member towards it. Otherwise, we return.
// reconcile the member towards it. Otherwise, we ensure the
// assignment is consistent with the subscribed topics, if changed.
if (member.memberEpoch() != targetAssignmentEpoch) {
return computeNextAssignment(
member.memberEpoch(),
member.assignedPartitions()
);
} else if (hasSubscriptionChanged) {
return updateCurrentAssignment(member.assignedPartitions());
} else {
return member;
}
@ -147,18 +207,27 @@ public class CurrentAssignmentBuilder {
// until the member has revoked the necessary partitions. They are
// considered revoked when they are not anymore reported in the
// owned partitions set in the ConsumerGroupHeartbeat API.
// Additional partitions may need revoking when the member's
// subscription changes.
// If the member provides its owned partitions. We verify if it still
// owns any of the revoked partitions. If it does, we cannot progress.
if (ownsRevokedPartitions(member.partitionsPendingRevocation())) {
return member;
if (hasSubscriptionChanged) {
return updateCurrentAssignment(member.assignedPartitions());
} else {
return member;
}
}
// When the member has revoked all the pending partitions, it can
// transition to the next epoch (current + 1) and we can reconcile
// its state towards the latest target assignment.
return computeNextAssignment(
member.memberEpoch() + 1,
// When we enter UNREVOKED_PARTITIONS due to a subscription change,
// we must not advance the member epoch when the new target
// assignment is not available yet.
Math.min(member.memberEpoch() + 1, targetAssignmentEpoch),
member.assignedPartitions()
);
@ -215,6 +284,71 @@ public class CurrentAssignmentBuilder {
return false;
}
/**
* Updates the current assignment, removing any partitions that are not part of the subscribed topics.
* This method is a lot faster than running the full reconciliation logic in computeNextAssignment.
*
* @param memberAssignedPartitions The assigned partitions of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
// Reuse the original map if no topics need to be removed.
Map<Uuid, Set<Integer>> newAssignedPartitions;
Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
if (subscribedTopicIds.isEmpty() && member.partitionsPendingRevocation().isEmpty()) {
newAssignedPartitions = Map.of();
newPartitionsPendingRevocation = memberAssignedPartitions;
} else {
newAssignedPartitions = memberAssignedPartitions;
newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation());
for (Map.Entry<Uuid, Set<Integer>> entry : memberAssignedPartitions.entrySet()) {
if (!subscribedTopicIds.contains(entry.getKey())) {
if (newAssignedPartitions == memberAssignedPartitions) {
newAssignedPartitions = new HashMap<>(memberAssignedPartitions);
newPartitionsPendingRevocation = new HashMap<>(member.partitionsPendingRevocation());
}
newAssignedPartitions.remove(entry.getKey());
newPartitionsPendingRevocation.merge(
entry.getKey(),
entry.getValue(),
(existing, additional) -> {
existing = new HashSet<>(existing);
existing.addAll(additional);
return existing;
}
);
}
}
}
if (newAssignedPartitions == memberAssignedPartitions) {
// If no partitions were removed, we can return the member as is.
return member;
}
if (!newPartitionsPendingRevocation.isEmpty() && ownsRevokedPartitions(newPartitionsPendingRevocation)) {
return new ConsumerGroupMember.Builder(member)
.setState(MemberState.UNREVOKED_PARTITIONS)
.setAssignedPartitions(newAssignedPartitions)
.setPartitionsPendingRevocation(newPartitionsPendingRevocation)
.build();
} else {
// There were partitions removed, but they were already revoked.
// Keep the member in the current state and shrink the assigned partitions.
// We do not expect to be in the UNREVOKED_PARTITIONS state here. The full
// reconciliation logic should handle the case where the member has revoked all its
// partitions pending revocation.
return new ConsumerGroupMember.Builder(member)
.setAssignedPartitions(newAssignedPartitions)
.build();
}
}
/**
* Computes the next assignment.
*
@ -227,6 +361,8 @@ public class CurrentAssignmentBuilder {
int memberEpoch,
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
boolean hasUnreleasedPartitions = false;
Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new HashMap<>();
@ -241,6 +377,11 @@ public class CurrentAssignmentBuilder {
Set<Integer> currentAssignedPartitions = memberAssignedPartitions
.getOrDefault(topicId, Set.of());
// If the member is no longer subscribed to the topic, treat its target assignment as empty.
if (!subscribedTopicIds.contains(topicId)) {
target = Set.of();
}
// New Assigned Partitions = Previous Assigned Partitions Target
Set<Integer> assignedPartitions = new HashSet<>(currentAssignedPartitions);
assignedPartitions.retainAll(target);
@ -317,4 +458,28 @@ public class CurrentAssignmentBuilder {
.build();
}
}
/**
* Gets the set of topic IDs that the member is subscribed to.
*
* @return The set of topic IDs that the member is subscribed to.
*/
private Set<Uuid> subscribedTopicIds() {
Set<String> subscriptions = member.subscribedTopicNames();
String subscribedTopicRegex = member.subscribedTopicRegex();
if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex);
if (resolvedRegularExpression != null) {
if (subscriptions.isEmpty()) {
subscriptions = resolvedRegularExpression.topics();
} else if (!resolvedRegularExpression.topics().isEmpty()) {
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics());
}
} else {
// Treat an unresolved regex as matching no topics, to be conservative.
}
}
return new TopicIds(subscriptions, metadataImage);
}
}

View File

@ -16,10 +16,16 @@
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The ShareGroupAssignmentBuilder class encapsulates the reconciliation engine of the
@ -32,6 +38,11 @@ public class ShareGroupAssignmentBuilder {
*/
private final ShareGroupMember member;
/**
* The metadata image.
*/
private CoordinatorMetadataImage metadataImage = CoordinatorMetadataImage.EMPTY;
/**
* The target assignment epoch.
*/
@ -42,6 +53,11 @@ public class ShareGroupAssignmentBuilder {
*/
private Assignment targetAssignment;
/**
* Whether the member has changed its subscription on the current heartbeat.
*/
private boolean hasSubscriptionChanged;
/**
* Constructs the ShareGroupAssignmentBuilder based on the current state of the
* provided share group member.
@ -52,6 +68,19 @@ public class ShareGroupAssignmentBuilder {
this.member = Objects.requireNonNull(member);
}
/**
* Sets the metadata image.
*
* @param metadataImage The metadata image.
* @return This object.
*/
public ShareGroupAssignmentBuilder withMetadataImage(
CoordinatorMetadataImage metadataImage
) {
this.metadataImage = metadataImage;
return this;
}
/**
* Sets the target assignment epoch and the target assignment that the
* share group member must be reconciled to.
@ -69,6 +98,19 @@ public class ShareGroupAssignmentBuilder {
return this;
}
/**
* Sets whether the member has changed its subscription on the current heartbeat.
*
* @param hasSubscriptionChanged If true, always removes unsubscribed topics from the current assignment.
* @return This object.
*/
public ShareGroupAssignmentBuilder withHasSubscriptionChanged(
boolean hasSubscriptionChanged
) {
this.hasSubscriptionChanged = hasSubscriptionChanged;
return this;
}
/**
* Builds the next state for the member or keep the current one if it
* is not possible to move forward with the current state.
@ -83,11 +125,38 @@ public class ShareGroupAssignmentBuilder {
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
.setAssignedPartitions(targetAssignment.partitions())
// If we have client-side assignors, the latest target assignment may not
// be consistent with the latest subscribed topics, so we must always
// filter the assigned partitions to ensure they are consistent with the
// subscribed topics.
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames()))
.updateMemberEpoch(targetAssignmentEpoch)
.build();
} else if (hasSubscriptionChanged) {
return new ShareGroupMember.Builder(member)
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames()))
.build();
} else {
return member;
}
}
return member;
private Map<Uuid, Set<Integer>> filterAssignedPartitions(
Map<Uuid, Set<Integer>> partitions,
Set<String> subscribedTopicNames
) {
TopicIds subscribedTopicIds = new TopicIds(member.subscribedTopicNames(), metadataImage);
// Reuse the original map if no topics need to be removed.
Map<Uuid, Set<Integer>> filteredPartitions = partitions;
for (Map.Entry<Uuid, Set<Integer>> entry : partitions.entrySet()) {
if (!subscribedTopicIds.contains(entry.getKey())) {
if (filteredPartitions == partitions) {
filteredPartitions = new HashMap<>(partitions);
}
filteredPartitions.remove(entry.getKey());
}
}
return filteredPartitions;
}
}

View File

@ -74,6 +74,7 @@ public class ShareGroupMember extends ModernGroupMember {
this.memberId = Objects.requireNonNull(newMemberId);
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.state = member.state;
this.rackId = member.rackId;
this.clientId = member.clientId;
this.clientHost = member.clientHost;

View File

@ -20604,7 +20604,7 @@ public class GroupMetadataManagerTest {
.build();
// Member 1 updates its new regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
@ -20620,19 +20620,15 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(0, 1, 2, 3, 4, 5))
))
.setTopicPartitions(List.of())
),
result.response()
result1.response()
);
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
@ -20644,10 +20640,12 @@ public class GroupMetadataManagerTest {
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The previous regular expression is deleted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
// The member assignment is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
);
assertRecordsEquals(expectedRecords, result.records());
assertRecordsEquals(expectedRecords, result1.records());
// Execute pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
@ -20675,6 +20673,65 @@ public class GroupMetadataManagerTest {
),
task.result().records()
);
assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
))
)));
// Member heartbeats again with the same regex.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(List.of()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(0, 1, 2, 3, 4, 5)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(List.of(0, 1, 2))))),
result2.response()
);
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build();
expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
);
assertRecordsEquals(expectedRecords, result2.records());
}
@Test
@ -21077,10 +21134,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
.setTopicPartitions(List.of())),
result1.response()
);
@ -21098,7 +21152,8 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
),
result1.records()
);
@ -21164,8 +21219,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.setAssignedPartitions(mkAssignment())
.build();
assertResponseEquals(
@ -21174,10 +21228,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
.setTopicPartitions(List.of())),
result2.response()
);
@ -21306,10 +21357,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
.setTopicPartitions(List.of())),
result1.response()
);
@ -21327,7 +21375,8 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
),
result1.records()
);
@ -21440,6 +21489,219 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
String instanceId = "instance-id";
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.buildCoordinatorMetadataImage(12345L);
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withMetadataImage(metadataImage)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setInstanceId(instanceId)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.withAssignmentEpoch(10))
.build();
// Static member temporarily leaves the group.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setInstanceId(instanceId)
.setMemberId(memberId1)
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
);
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH),
result1.response()
);
// Static member joins the group with an updated regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setInstanceId(instanceId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignor("range")
.setTopicPartitions(List.of()));
// The returned assignment does not contain topics not in the current regular expression.
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of())
),
result2.response()
);
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build();
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setInstanceId(instanceId)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.build();
List<CoordinatorRecord> expectedRecords = List.of(
// The previous member is deleted.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
// The previous member is replaced by the new one.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember),
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The previous regular expression is deleted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"),
// The member assignment is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
);
assertRecordsEquals(expectedRecords, result2.records());
// Execute pending tasks.
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> tasks = context.processTasks();
assertEquals(1, tasks.size());
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key());
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*",
new ResolvedRegularExpression(
Set.of("foo"),
12345L,
context.time.milliseconds()
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
)))
),
task.result().records()
);
assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
))
)));
// Member heartbeats again with the same regex.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result3 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setInstanceId(instanceId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignor("range")
.setTopicPartitions(List.of()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(0, 1, 2, 3, 4, 5))))),
result3.response()
);
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setInstanceId(instanceId)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.build();
expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
);
assertRecordsEquals(expectedRecords, result3.records());
}
@Test
public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";

View File

@ -19,13 +19,19 @@ package org.apache.kafka.coordinator.group.modern.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@ -36,19 +42,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToStable() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
@ -60,6 +75,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@ -70,19 +86,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToStableWithNewPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@ -94,6 +119,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7)))
@ -104,19 +130,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToUnrevokedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@ -128,6 +163,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -141,19 +177,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToUnreleasedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@ -165,6 +210,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@ -175,19 +221,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testStableToUnreleasedPartitionsWithOwnedPartitionsNotHavingRevokedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 7))))
@ -202,6 +257,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5)))
@ -212,13 +268,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnrevokedPartitionsToStable() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -228,6 +292,7 @@ public class CurrentAssignmentBuilderTest {
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6))))
@ -246,6 +311,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -256,13 +322,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testRemainsInUnrevokedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -272,6 +346,7 @@ public class CurrentAssignmentBuilderTest {
.build();
CurrentAssignmentBuilder currentAssignmentBuilder = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@ -311,15 +386,27 @@ public class CurrentAssignmentBuilderTest {
);
}
@Test
public void testUnrevokedPartitionsToUnrevokedPartitions() {
@ParameterizedTest
@CsvSource({
"10, 12, 11",
"10, 10, 10", // The member epoch must not advance past the target assignment epoch.
})
public void testUnrevokedPartitionsToUnrevokedPartitions(int memberEpoch, int targetAssignmentEpoch, int expectedMemberEpoch) {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -329,7 +416,8 @@ public class CurrentAssignmentBuilderTest {
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(12, new Assignment(mkAssignment(
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
@ -345,8 +433,9 @@ public class CurrentAssignmentBuilderTest {
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setMemberEpoch(expectedMemberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@ -360,19 +449,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnrevokedPartitionsToUnreleasedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@ -391,6 +489,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -401,19 +500,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToStable() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6))))
@ -425,6 +533,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(12)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
@ -435,19 +544,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToStableWithNewPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@ -459,6 +577,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7)))
@ -469,19 +588,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToUnreleasedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 2, 3, 4),
mkTopicAssignment(topicId2, 5, 6, 7))))
@ -493,19 +621,28 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnreleasedPartitionsToUnrevokedPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNRELEASED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@ -517,6 +654,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@ -530,13 +668,21 @@ public class CurrentAssignmentBuilderTest {
@Test
public void testUnknownState() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNKNOWN)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@ -548,6 +694,7 @@ public class CurrentAssignmentBuilderTest {
// When the member is in an unknown state, the member is first to force
// a reset of the client side member state.
assertThrows(FencedMemberEpochException.class, () -> new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@ -556,6 +703,7 @@ public class CurrentAssignmentBuilderTest {
// Then the member rejoins with no owned partitions.
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6))))
@ -568,6 +716,7 @@ public class CurrentAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(12)
.setPreviousMemberEpoch(11)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 3),
mkTopicAssignment(topicId2, 6)))
@ -575,4 +724,355 @@ public class CurrentAssignmentBuilderTest {
updatedMember
);
}
@ParameterizedTest
@CsvSource({
"10, 11, 11, false", // When advancing to a new target assignment, the assignment should
"10, 11, 11, true", // always take the subscription into account.
"10, 10, 10, true",
})
public void testStableToStableWithAssignmentTopicsNoLongerInSubscription(
int memberEpoch,
int targetAssignmentEpoch,
int expectedMemberEpoch,
boolean hasSubscriptionChanged
) {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
.withHasSubscriptionChanged(hasSubscriptionChanged)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId2)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(expectedMemberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId2, 4, 5, 6)))
.build(),
updatedMember
);
}
@ParameterizedTest
@CsvSource({
"10, 11, 10, false", // When advancing to a new target assignment, the assignment should always
"10, 11, 10, true", // take the subscription into account.
"10, 10, 10, true"
})
public void testStableToUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription(
int memberEpoch,
int targetAssignmentEpoch,
int expectedMemberEpoch,
boolean hasSubscriptionChanged
) {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
.withHasSubscriptionChanged(hasSubscriptionChanged)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId1)
.setPartitions(Arrays.asList(1, 2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId2)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(expectedMemberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId2, 4, 5, 6)))
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3)))
.build(),
updatedMember
);
}
@Test
public void testRemainsInUnrevokedPartitionsWithAssignmentTopicsNoLongerInSubscription() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 2, 3),
mkTopicAssignment(topicId2, 5, 6)))
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(topicId1, 1),
mkTopicAssignment(topicId2, 4)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(12, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 3, 4),
mkTopicAssignment(topicId2, 6, 7))))
.withHasSubscriptionChanged(true)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId1)
.setPartitions(Arrays.asList(1, 2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(topicId2)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId2, 5, 6)))
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4)))
.build(),
updatedMember
);
}
@Test
public void testSubscribedTopicNameAndUnresolvedRegularExpression() {
String fooTopic = "foo";
String barTopic = "bar";
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopic, 10)
.addTopic(barTopicId, barTopic, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(fooTopic))
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(10, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6))))
.withHasSubscriptionChanged(true)
.withResolvedRegularExpressions(Map.of())
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(1, 2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(fooTopic))
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)))
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build(),
updatedMember
);
}
@Test
public void testUnresolvedRegularExpression() {
String fooTopic = "foo";
String barTopic = "bar";
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopic, 10)
.addTopic(barTopicId, barTopic, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of())
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(10, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6))))
.withHasSubscriptionChanged(true)
.withResolvedRegularExpressions(Map.of())
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(1, 2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of())
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment())
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build(),
updatedMember
);
}
@Test
public void testSubscribedTopicNameAndResolvedRegularExpression() {
String fooTopic = "foo";
String barTopic = "bar";
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopic, 10)
.addTopic(barTopicId, barTopic, 10)
.buildCoordinatorMetadataImage();
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(fooTopic))
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build();
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(10, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6))))
.withHasSubscriptionChanged(true)
.withResolvedRegularExpressions(Map.of(
"bar*", new ResolvedRegularExpression(
Set.of("bar"),
12345L,
0L
)
))
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.withOwnedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(1, 2, 3)),
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(4, 5, 6))))
.build();
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(fooTopic))
.setSubscribedTopicRegex("bar*")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 4, 5, 6)))
.build(),
updatedMember
);
}
}

View File

@ -17,10 +17,16 @@
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.List;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@ -30,19 +36,28 @@ public class ShareGroupAssignmentBuilderTest {
@Test
public void testStableToStable() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ShareGroupMember member = new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
@ -53,6 +68,7 @@ public class ShareGroupAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
@ -63,19 +79,28 @@ public class ShareGroupAssignmentBuilderTest {
@Test
public void testStableToStableWithNewPartitions() {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ShareGroupMember member = new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(11, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7))))
@ -86,6 +111,7 @@ public class ShareGroupAssignmentBuilderTest {
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3, 4),
mkTopicAssignment(topicId2, 4, 5, 6, 7)))
@ -93,4 +119,56 @@ public class ShareGroupAssignmentBuilderTest {
updatedMember
);
}
@ParameterizedTest
@CsvSource({
"10, 11, false", // When advancing to a new target assignment, the assignment should always
"10, 11, true", // take the subscription into account.
"10, 10, true"
})
public void testStableToStableWithAssignmentTopicsNoLongerInSubscription(
int memberEpoch,
int targetAssignmentEpoch,
boolean hasSubscriptionChanged
) {
String topic1 = "topic1";
String topic2 = "topic2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, topic1, 10)
.addTopic(topicId2, topic2, 10)
.buildCoordinatorMetadataImage();
ShareGroupMember member = new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(targetAssignmentEpoch, new Assignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6))))
.withHasSubscriptionChanged(hasSubscriptionChanged)
.build();
assertEquals(
new ShareGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(targetAssignmentEpoch)
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId2, 4, 5, 6)))
.build(),
updatedMember
);
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class CurrentAssignmentBuilderBenchmark {
@Param({"5", "50"})
private int partitionsPerTopic;
@Param({"10", "100", "1000"})
private int topicCount;
private List<String> topicNames;
private List<Uuid> topicIds;
private CoordinatorMetadataImage metadataImage;
private ConsumerGroupMember member;
private ConsumerGroupMember memberWithUnsubscribedTopics;
private Assignment targetAssignment;
@Setup(Level.Trial)
public void setup() {
setupTopics();
setupMember();
setupTargetAssignment();
}
private void setupTopics() {
topicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
topicIds = new ArrayList<>(topicCount);
metadataImage = AssignorBenchmarkUtils.createMetadataImage(topicNames, partitionsPerTopic);
for (String topicName : topicNames) {
Uuid topicId = metadataImage.topicMetadata(topicName).get().id();
topicIds.add(topicId);
}
}
private void setupMember() {
Map<Uuid, Set<Integer>> assignedPartitions = new HashMap<>();
for (Uuid topicId : topicIds) {
Set<Integer> partitions = IntStream.range(0, partitionsPerTopic)
.boxed()
.collect(Collectors.toSet());
assignedPartitions.put(topicId, partitions);
}
ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(topicNames)
.setAssignedPartitions(assignedPartitions);
member = memberBuilder.build();
memberWithUnsubscribedTopics = memberBuilder
.setSubscribedTopicNames(topicNames.subList(0, topicNames.size() - 1))
.build();
}
private void setupTargetAssignment() {
Map<Uuid, Set<Integer>> assignedPartitions = new HashMap<>();
for (Uuid topicId : topicIds) {
Set<Integer> partitions = IntStream.range(0, partitionsPerTopic)
.boxed()
.collect(Collectors.toSet());
assignedPartitions.put(topicId, partitions);
}
targetAssignment = new Assignment(assignedPartitions);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public ConsumerGroupMember stableToStableWithNoChange() {
return new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(member.memberEpoch(), targetAssignment)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public ConsumerGroupMember stableToStableWithNewTargetAssignment() {
return new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(member.memberEpoch() + 1, targetAssignment)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public ConsumerGroupMember stableToStableWithSubscriptionChange() {
return new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(member.memberEpoch(), targetAssignment)
.withHasSubscriptionChanged(true)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public ConsumerGroupMember stableToUnrevokedPartitionsWithSubscriptionChange() {
return new CurrentAssignmentBuilder(memberWithUnsubscribedTopics)
.withMetadataImage(metadataImage)
.withTargetAssignment(memberWithUnsubscribedTopics.memberEpoch(), targetAssignment)
.withHasSubscriptionChanged(true)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple class that sets logIdent appropriately depending on whether the state change logger is being used in the
* context of the broker (e.g. ReplicaManager and Partition).
*/
public class StateChangeLogger {
private static final Logger LOGGER = LoggerFactory.getLogger("state.change.logger");
private final String logIdent;
public StateChangeLogger(int brokerId) {
this.logIdent = String.format("[Broker id=%d] ", brokerId);
}
public void trace(String message) {
LOGGER.info("{}{}", logIdent, message);
}
public void info(String message) {
LOGGER.info("{}{}", logIdent, message);
}
public void warn(String message) {
LOGGER.warn("{}{}", logIdent, message);
}
public void error(String message) {
LOGGER.error("{}{}", logIdent, message);
}
public void error(String message, Throwable e) {
LOGGER.error("{}{}", logIdent, message, e);
}
}