MINOR: Cleanups in coordinator-common/group-coordinator (#20097)

- Use `record` where possible
- Use enhanced `switch`
- Tweak a bunch of assertions

Reviewers: Yung <yungyung7654321@gmail.com>, TengYao Chi
 <frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Dongnuo Lyu
 <dlyu@confluent.io>, PoAn Yang <payang@apache.org>
This commit is contained in:
Mickael Maison 2025-07-31 10:34:08 +02:00 committed by GitHub
parent 44c6e956ed
commit fd9b5514ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 191 additions and 421 deletions

View File

@ -48,36 +48,25 @@ public class CoordinatorOperationExceptionHelper {
) {
ApiError apiError = ApiError.fromThrowable(exception);
switch (apiError.error()) {
case UNKNOWN_SERVER_ERROR:
return switch (apiError.error()) {
case UNKNOWN_SERVER_ERROR -> {
log.error("Operation {} with {} hit an unexpected exception: {}.",
operationName, operationInput, exception.getMessage(), exception);
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
case NETWORK_EXCEPTION:
yield handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
}
case NETWORK_EXCEPTION ->
// When committing offsets transactionally, we now verify the transaction with the
// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a
// retriable error which older clients may not expect and retry correctly. We
// translate the error to `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients
// to retry the request without an unnecessary coordinator lookup.
return handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
return handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
case NOT_LEADER_OR_FOLLOWER:
case KAFKA_STORAGE_ERROR:
return handler.apply(Errors.NOT_COORDINATOR, null);
case MESSAGE_TOO_LARGE:
case RECORD_LIST_TOO_LARGE:
case INVALID_FETCH_SIZE:
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
default:
return handler.apply(apiError.error(), apiError.message());
}
handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
case NOT_LEADER_OR_FOLLOWER, KAFKA_STORAGE_ERROR -> handler.apply(Errors.NOT_COORDINATOR, null);
case MESSAGE_TOO_LARGE, RECORD_LIST_TOO_LARGE, INVALID_FETCH_SIZE ->
handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
default -> handler.apply(apiError.error(), apiError.message());
};
}
}

View File

@ -49,10 +49,7 @@ public class MockCoordinatorExecutor<T> implements CoordinatorExecutor<T> {
}
}
public static class ExecutorResult<T> {
public final String key;
public final CoordinatorResult<Void, T> result;
public record ExecutorResult<T>(String key, CoordinatorResult<Void, T> result) {
public ExecutorResult(
String key,
CoordinatorResult<Void, T> result
@ -61,24 +58,6 @@ public class MockCoordinatorExecutor<T> implements CoordinatorExecutor<T> {
this.result = Objects.requireNonNull(result);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutorResult<?> that = (ExecutorResult<?>) o;
if (!Objects.equals(key, that.key)) return false;
return Objects.equals(result, that.result);
}
@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + this.result.hashCode();
return result;
}
@Override
public String toString() {
return "ExecutorResult(" +

View File

@ -31,7 +31,7 @@ import java.util.stream.Collectors;
* A simple Coordinator implementation that stores the records into a set.
*/
public class MockCoordinatorShard implements CoordinatorShard<String> {
static record RecordAndMetadata(
record RecordAndMetadata(
long offset,
long producerId,
short producerEpoch,

View File

@ -23,7 +23,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
@ -36,54 +35,13 @@ public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> {
/**
* Represents a scheduled timeout.
*/
public static class ScheduledTimeout<T, U> {
public final String key;
public final long deadlineMs;
public final TimeoutOperation<T, U> operation;
public ScheduledTimeout(
String key,
long deadlineMs,
TimeoutOperation<T, U> operation
) {
this.key = key;
this.deadlineMs = deadlineMs;
this.operation = operation;
}
public record ScheduledTimeout<T, U>(String key, long deadlineMs, TimeoutOperation<T, U> operation) {
}
/**
* Represents an expired timeout.
*/
public static class ExpiredTimeout<T, U> {
public final String key;
public final CoordinatorResult<T, U> result;
public ExpiredTimeout(
String key,
CoordinatorResult<T, U> result
) {
this.key = key;
this.result = result;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExpiredTimeout<?, ?> that = (ExpiredTimeout<?, ?>) o;
if (!Objects.equals(key, that.key)) return false;
return Objects.equals(result, that.result);
}
@Override
public int hashCode() {
int result1 = key != null ? key.hashCode() : 0;
result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
return result1;
}
public record ExpiredTimeout<T, U>(String key, CoordinatorResult<T, U> result) {
}
private final Time time;

View File

@ -560,7 +560,7 @@ public class GroupCoordinatorConfig {
}
} else if (object instanceof Class<?> klass) {
Object o = Utils.newInstance((Class<?>) klass);
if (!ConsumerGroupPartitionAssignor.class.isInstance(o)) {
if (!(o instanceof ConsumerGroupPartitionAssignor)) {
throw new KafkaException(klass + " is not an instance of " + ConsumerGroupPartitionAssignor.class.getName());
}
assignor = (ConsumerGroupPartitionAssignor) o;

View File

@ -327,7 +327,7 @@ public class GroupCoordinatorRecordHelpers {
String regex,
ResolvedRegularExpression resolvedRegularExpression
) {
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics);
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics());
Collections.sort(topics);
return CoordinatorRecord.record(
@ -337,8 +337,8 @@ public class GroupCoordinatorRecordHelpers {
new ApiMessageAndVersion(
new ConsumerGroupRegularExpressionValue()
.setTopics(topics)
.setVersion(resolvedRegularExpression.version)
.setTimestamp(resolvedRegularExpression.timestamp),
.setVersion(resolvedRegularExpression.version())
.setTimestamp(resolvedRegularExpression.timestamp()),
(short) 0
)
);

View File

@ -1301,7 +1301,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
"share-group-offsets-alter",
request,
exception,
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
AlterShareGroupOffsetsRequest::getErrorResponseData,
log
));
}
@ -1891,7 +1891,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
"initiate-delete-share-group-offsets",
groupId,
exception,
(error, message) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
DeleteShareGroupOffsetsRequest::getErrorDeleteResponseData,
log
));
}
@ -2332,10 +2332,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
) {
ApiError apiError = ApiError.fromThrowable(exception);
switch (apiError.error()) {
case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
return switch (apiError.error()) {
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
// additionally triggers coordinator re-lookup, which is necessary if the client is
@ -2345,14 +2343,12 @@ public class GroupCoordinatorService implements GroupCoordinator {
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
// 3.9.
return OffsetFetchResponse.groupError(
OffsetFetchResponse.groupError(
request,
Errors.NOT_COORDINATOR,
context.requestVersion()
);
default:
return handleOperationException(
default -> handleOperationException(
operationName,
request,
exception,
@ -2363,7 +2359,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
),
log
);
}
};
}
private static void requireNonNull(Object obj, String msg) {

View File

@ -3347,14 +3347,14 @@ public class GroupMetadataManager {
.resolvedRegularExpression(regex)
.orElse(ResolvedRegularExpression.EMPTY);
if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) {
if (!oldResolvedRegularExpression.topics().equals(newResolvedRegularExpression.topics())) {
bumpGroupEpoch = true;
oldResolvedRegularExpression.topics.forEach(topicName ->
oldResolvedRegularExpression.topics().forEach(topicName ->
subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount)
);
newResolvedRegularExpression.topics.forEach(topicName ->
newResolvedRegularExpression.topics().forEach(topicName ->
subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount)
);
}

View File

@ -18,17 +18,12 @@ package org.apache.kafka.coordinator.group;
import java.util.function.Function;
public class OffsetExpirationConditionImpl implements OffsetExpirationCondition {
/**
* Given an offset and metadata, obtain the base timestamp that should be used
/**
* @param baseTimestamp Given an offset and metadata, obtain the base timestamp that should be used
* as the start of the offsets retention period.
*/
private final Function<OffsetAndMetadata, Long> baseTimestamp;
public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimestamp) {
this.baseTimestamp = baseTimestamp;
}
public record OffsetExpirationConditionImpl(
Function<OffsetAndMetadata, Long> baseTimestamp) implements OffsetExpirationCondition {
/**
* Determine whether an offset is expired. Older versions have an expire timestamp per partition. If this
@ -39,7 +34,6 @@ public class OffsetExpirationConditionImpl implements OffsetExpirationCondition
* @param offset The offset and metadata.
* @param currentTimestampMs The current timestamp.
* @param offsetsRetentionMs The offsets retention in milliseconds.
*
* @return Whether the given offset is expired or not.
*/
@Override
@ -52,11 +46,4 @@ public class OffsetExpirationConditionImpl implements OffsetExpirationCondition
return currentTimestampMs - baseTimestamp.apply(offset) >= offsetsRetentionMs;
}
}
/**
* @return The base timestamp.
*/
public Function<OffsetAndMetadata, Long> baseTimestamp() {
return this.baseTimestamp;
}
}

View File

@ -201,7 +201,7 @@ public class OffsetMetadataManager {
/**
* Tracks open transactions (producer ids) by group id, topic name and partition id.
* It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}.
* It is the responsibility of the caller to update {@link #pendingTransactionalOffsets}.
*/
private class OpenTransactions {
/**

View File

@ -65,20 +65,12 @@ public class ShareGroupAutoOffsetResetStrategy {
AutoOffsetResetStrategy baseStrategy = AutoOffsetResetStrategy.fromString(offsetStrategy);
AutoOffsetResetStrategy.StrategyType baseType = baseStrategy.type();
StrategyType shareGroupType;
switch (baseType) {
case EARLIEST:
shareGroupType = StrategyType.EARLIEST;
break;
case LATEST:
shareGroupType = StrategyType.LATEST;
break;
case BY_DURATION:
shareGroupType = StrategyType.BY_DURATION;
break;
default:
throw new IllegalArgumentException("Unsupported strategy for ShareGroup: " + baseType);
}
StrategyType shareGroupType = switch (baseType) {
case EARLIEST -> StrategyType.EARLIEST;
case LATEST -> StrategyType.LATEST;
case BY_DURATION -> StrategyType.BY_DURATION;
default -> throw new IllegalArgumentException("Unsupported strategy for ShareGroup: " + baseType);
};
return new ShareGroupAutoOffsetResetStrategy(baseStrategy, shareGroupType);
}

View File

@ -754,7 +754,7 @@ public class UniformHeterogeneousAssignmentBuilder {
// First, choose a member from the most loaded range to reassign a partition from.
// Loop until we find a member that has partitions to give up.
int mostLoadedMemberIndex = -1;
int mostLoadedMemberIndex;
while (true) {
mostLoadedMemberIndex = memberAssignmentBalancer.nextMostLoadedMember();

View File

@ -272,16 +272,6 @@ public class UniformHomogeneousAssignmentBuilder {
}
}
private static class MemberWithRemainingQuota {
final String memberId;
final int remainingQuota;
MemberWithRemainingQuota(
String memberId,
int remainingQuota
) {
this.memberId = memberId;
this.remainingQuota = remainingQuota;
}
private record MemberWithRemainingQuota(String memberId, int remainingQuota) {
}
}

View File

@ -1175,9 +1175,8 @@ public class ClassicGroup implements Group {
});
return Optional.of(allSubscribedTopics);
} catch (SchemaException e) {
log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" +
protocolName.get() + " of group " + groupId + ". " +
"Consumer group coordinator is not aware of the subscribed topics.", e);
log.warn("Failed to parse Consumer Protocol {}:{} of group {}. Consumer group coordinator is not aware of the subscribed topics.",
ConsumerProtocol.PROTOCOL_TYPE, protocolName.get(), groupId, e);
}
}

View File

@ -26,9 +26,6 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@ -43,24 +40,14 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
private static final Logger log = LoggerFactory.getLogger(GroupCoordinatorMetricsShard.class);
/**
* This class represents a gauge counter for this shard. The TimelineLong object represents a gauge backed by
* the snapshot registry. Once we commit to a certain offset in the snapshot registry, we write the given
* TimelineLong's value to the AtomicLong. This AtomicLong represents the actual gauge counter that is queried
* when reporting the value to {@link GroupCoordinatorMetrics}.
*/
private static class TimelineGaugeCounter {
private record TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
final TimelineLong timelineLong;
final AtomicLong atomicLong;
public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
this.timelineLong = timelineLong;
this.atomicLong = atomicLong;
}
}
/**
* Classic group size gauge counters keyed by the metric name.

View File

@ -19,21 +19,19 @@ package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The partition assignment for a modern group member.
*
* @param partitions The partitions assigned to this member keyed by topicId.
*/
public class MemberAssignmentImpl implements MemberAssignment {
/**
* The partitions assigned to this member keyed by topicId.
*/
private final Map<Uuid, Set<Integer>> partitions;
public MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions) {
this.partitions = Objects.requireNonNull(partitions);
public record MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions) implements MemberAssignment {
public MemberAssignmentImpl {
partitions = Collections.unmodifiableMap(Objects.requireNonNull(partitions));
}
/**
@ -44,19 +42,6 @@ public class MemberAssignmentImpl implements MemberAssignment {
return this.partitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberAssignmentImpl that = (MemberAssignmentImpl) o;
return partitions.equals(that.partitions);
}
@Override
public int hashCode() {
return partitions.hashCode();
}
@Override
public String toString() {
return "MemberAssignment(partitions=" + partitions + ')';

View File

@ -525,7 +525,7 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
}
for (SubscriptionCount subscriberCount : subscribedTopicNames.values()) {
if (subscriberCount.byNameCount != numberOfMembers) {
if (subscriberCount.byNameCount() != numberOfMembers) {
return HETEROGENEOUS;
}
}

View File

@ -20,32 +20,7 @@ package org.apache.kafka.coordinator.group.modern;
* A class which holds two counters. One to count subscription by name and
* another one to count subscription by regex.
*/
public class SubscriptionCount {
public final int byNameCount;
public final int byRegexCount;
public SubscriptionCount(int byNameCount, int byRegexCount) {
this.byNameCount = byNameCount;
this.byRegexCount = byRegexCount;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscriptionCount that = (SubscriptionCount) o;
if (byNameCount != that.byNameCount) return false;
return byRegexCount == that.byRegexCount;
}
@Override
public int hashCode() {
int result = byNameCount;
result = 31 * result + byRegexCount;
return result;
}
public record SubscriptionCount(int byNameCount, int byRegexCount) {
@Override
public String toString() {

View File

@ -161,12 +161,12 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex);
if (resolvedRegularExpression != null) {
if (subscriptions.isEmpty()) {
subscriptions = resolvedRegularExpression.topics;
} else if (!resolvedRegularExpression.topics.isEmpty()) {
subscriptions = resolvedRegularExpression.topics();
} else if (!resolvedRegularExpression.topics().isEmpty()) {
// We only use a UnionSet when the member uses both type of subscriptions. The
// protocol allows it. However, the Apache Kafka Consumer does not support it.
// Other clients such as librdkafka may support it.
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics);
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics());
}
}
}

View File

@ -68,20 +68,13 @@ public class TopicIds implements Set<Uuid> {
/**
* A TopicResolver without any caching.
*/
public static class DefaultTopicResolver implements TopicResolver {
private final CoordinatorMetadataImage image;
public record DefaultTopicResolver(CoordinatorMetadataImage image) implements TopicResolver {
public DefaultTopicResolver(
CoordinatorMetadataImage image
) {
this.image = Objects.requireNonNull(image);
}
@Override
public final CoordinatorMetadataImage image() {
return image;
}
@Override
public String name(Uuid id) {
return image.topicMetadata(id).map(CoordinatorMetadataImage.TopicMetadata::name).orElse(null);
@ -93,7 +86,8 @@ public class TopicIds implements Set<Uuid> {
}
@Override
public void clear() {}
public void clear() {
}
@Override
public String toString() {

View File

@ -386,7 +386,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
// is not subscribed to it, we must remove it from the subscribed topic names.
if (!oldSubscribedTopicRegex.equals(newSubscribedTopicRegex) && numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
resolvedRegularExpression(oldSubscribedTopicRegex).ifPresent(resolvedRegularExpression ->
resolvedRegularExpression.topics.forEach(topic -> subscribedTopicsNames.compute(topic, SubscriptionCount::decRegexCount))
resolvedRegularExpression.topics().forEach(topic -> subscribedTopicsNames.compute(topic, SubscriptionCount::decRegexCount))
);
}
}
@ -440,7 +440,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
removedRegexes.forEach(regex ->
resolvedRegularExpression(regex).ifPresent(resolvedRegularExpression ->
resolvedRegularExpression.topics.forEach(topic ->
resolvedRegularExpression.topics().forEach(topic ->
subscribedTopicsNames.compute(topic, SubscriptionCount::decRegexCount)
)
)
@ -462,7 +462,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
removeResolvedRegularExpression(regex);
if (newResolvedRegularExpression != null) {
resolvedRegularExpressions.put(regex, newResolvedRegularExpression);
newResolvedRegularExpression.topics.forEach(topicName -> subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount));
newResolvedRegularExpression.topics().forEach(topicName -> subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount));
}
}
@ -474,7 +474,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
public void removeResolvedRegularExpression(String regex) {
ResolvedRegularExpression oldResolvedRegularExpression = resolvedRegularExpressions.remove(regex);
if (oldResolvedRegularExpression != null) {
oldResolvedRegularExpression.topics.forEach(topicName -> subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount));
oldResolvedRegularExpression.topics().forEach(topicName -> subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount));
}
}
@ -486,7 +486,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
public long lastResolvedRegularExpressionRefreshTimeMs() {
Iterator<ResolvedRegularExpression> iterator = resolvedRegularExpressions.values().iterator();
if (iterator.hasNext()) {
return iterator.next().timestamp;
return iterator.next().timestamp();
} else {
return Long.MIN_VALUE;
}
@ -498,7 +498,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
public long lastResolvedRegularExpressionVersion() {
Iterator<ResolvedRegularExpression> iterator = resolvedRegularExpressions.values().iterator();
if (iterator.hasNext()) {
return iterator.next().version;
return iterator.next().version();
} else {
return 0L;
}
@ -851,7 +851,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
// considered as homogeneous if all the members are subscribed to the
// same topics. Otherwise, it is considered as heterogeneous.
for (SubscriptionCount subscriberCount : subscribedTopicNames.values()) {
if (subscriberCount.byNameCount != numberOfMembers) {
if (subscriberCount.byNameCount() != numberOfMembers) {
return HETEROGENEOUS;
}
}
@ -864,7 +864,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
// is considered as homogeneous. If some members are subscribed to
// topic names too, the subscription is considered as heterogeneous.
for (SubscriptionCount subscriberCount : subscribedTopicNames.values()) {
if (subscriberCount.byRegexCount != 1 || subscriberCount.byNameCount > 0) {
if (subscriberCount.byRegexCount() != 1 || subscriberCount.byNameCount() > 0) {
return HETEROGENEOUS;
}
}

View File

@ -22,53 +22,16 @@ import java.util.Set;
/**
* The metadata associated with a regular expression in a Consumer Group.
*
* @param topics The set of resolved topics.
* @param version The version of the metadata image used to resolve the topics.
* @param timestamp The timestamp at the time of the resolution.
*/
public class ResolvedRegularExpression {
public record ResolvedRegularExpression(Set<String> topics, long version, long timestamp) {
public static final ResolvedRegularExpression EMPTY = new ResolvedRegularExpression(Set.of(), -1L, -1L);
/**
* The set of resolved topics.
*/
public final Set<String> topics;
/**
* The version of the metadata image used to resolve the topics.
*/
public final long version;
/**
* The timestamp at the time of the resolution.
*/
public final long timestamp;
public ResolvedRegularExpression(
Set<String> topics,
long version,
long timestamp
) {
this.topics = Collections.unmodifiableSet(Objects.requireNonNull(topics));
this.version = version;
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResolvedRegularExpression that = (ResolvedRegularExpression) o;
if (version != that.version) return false;
if (timestamp != that.timestamp) return false;
return topics.equals(that.topics);
}
@Override
public int hashCode() {
int result = topics.hashCode();
result = 31 * result + (int) (version ^ (version >>> 32));
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
return result;
public ResolvedRegularExpression {
topics = Collections.unmodifiableSet(Objects.requireNonNull(topics));
}
@Override

View File

@ -33,7 +33,7 @@ public record StreamsGroupHeartbeatResult(StreamsGroupHeartbeatResponseData data
public StreamsGroupHeartbeatResult {
Objects.requireNonNull(data);
creatableTopics = Objects.requireNonNull(Collections.unmodifiableMap(creatableTopics));
creatableTopics = Collections.unmodifiableMap(Objects.requireNonNull(creatableTopics));
}
}

View File

@ -36,8 +36,8 @@ import java.util.SortedMap;
public record TopologyMetadata(CoordinatorMetadataImage metadataImage, SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
public TopologyMetadata {
metadataImage = Objects.requireNonNull(metadataImage);
subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
Objects.requireNonNull(metadataImage);
subtopologyMap = Collections.unmodifiableSortedMap(Objects.requireNonNull(subtopologyMap));
}
/**

View File

@ -271,14 +271,14 @@ public class Assertions {
Consumer<ShareGroupStatePartitionMetadataValue> normalize = message -> {
message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
message.initializedTopics().forEach(topic -> {
topic.partitions().sort(Comparator.naturalOrder());
});
message.initializedTopics().forEach(topic ->
topic.partitions().sort(Comparator.naturalOrder())
);
message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
message.initializingTopics().forEach(topic -> {
topic.partitions().sort(Comparator.naturalOrder());
});
message.initializingTopics().forEach(topic ->
topic.partitions().sort(Comparator.naturalOrder())
);
message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId));
};

View File

@ -42,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorConfigTest {
@ -93,15 +92,15 @@ public class GroupCoordinatorConfigTest {
config = createConfig(configs);
assignors = config.consumerGroupAssignors();
assertEquals(2, assignors.size());
assertTrue(assignors.get(0) instanceof RangeAssignor);
assertTrue(assignors.get(1) instanceof UniformAssignor);
assertInstanceOf(RangeAssignor.class, assignors.get(0));
assertInstanceOf(UniformAssignor.class, assignors.get(1));
// Test custom assignor.
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, CustomAssignor.class.getName());
config = createConfig(configs);
assignors = config.consumerGroupAssignors();
assertEquals(1, assignors.size());
assertTrue(assignors.get(0) instanceof CustomAssignor);
assertInstanceOf(CustomAssignor.class, assignors.get(0));
assertNotNull(((CustomAssignor) assignors.get(0)).configs);
// Test with classes.
@ -109,24 +108,24 @@ public class GroupCoordinatorConfigTest {
config = createConfig(configs);
assignors = config.consumerGroupAssignors();
assertEquals(2, assignors.size());
assertTrue(assignors.get(0) instanceof RangeAssignor);
assertTrue(assignors.get(1) instanceof CustomAssignor);
assertInstanceOf(RangeAssignor.class, assignors.get(0));
assertInstanceOf(CustomAssignor.class, assignors.get(1));
// Test combination of short name and class.
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, "uniform, " + CustomAssignor.class.getName());
config = createConfig(configs);
assignors = config.consumerGroupAssignors();
assertEquals(2, assignors.size());
assertTrue(assignors.get(0) instanceof UniformAssignor);
assertTrue(assignors.get(1) instanceof CustomAssignor);
assertInstanceOf(UniformAssignor.class, assignors.get(0));
assertInstanceOf(CustomAssignor.class, assignors.get(1));
// Test combination of short name and class.
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of("uniform", CustomAssignor.class.getName()));
config = createConfig(configs);
assignors = config.consumerGroupAssignors();
assertEquals(2, assignors.size());
assertTrue(assignors.get(0) instanceof UniformAssignor);
assertTrue(assignors.get(1) instanceof CustomAssignor);
assertInstanceOf(UniformAssignor.class, assignors.get(0));
assertInstanceOf(CustomAssignor.class, assignors.get(1));
}
@Test

View File

@ -1484,7 +1484,7 @@ public class GroupCoordinatorShardTest {
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs() - time.milliseconds()
);
// Advance the timer to trigger the update.
@ -1495,7 +1495,7 @@ public class GroupCoordinatorShardTest {
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs() - time.milliseconds()
);
}

View File

@ -4050,7 +4050,7 @@ public class GroupMetadataManagerTest {
// Execute the scheduled revocation timeout captured earlier to simulate a
// stale timeout. This should be a no-op.
assertEquals(List.of(), scheduledTimeout.operation.generateRecords().records());
assertEquals(List.of(), scheduledTimeout.operation().generateRecords().records());
}
@Test
@ -4693,7 +4693,7 @@ public class GroupMetadataManagerTest {
classicGroupHeartbeatKey("group-id", "member-1"));
assertNotNull(timeout);
assertEquals(context.time.milliseconds() + 4000, timeout.deadlineMs);
assertEquals(context.time.milliseconds() + 4000, timeout.deadlineMs());
});
}
@ -5579,10 +5579,10 @@ public class GroupMetadataManagerTest {
assertEquals(1, timeouts.size());
timeouts.forEach(timeout -> {
assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key);
assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key());
assertEquals(
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())),
timeout.result.records()
timeout.result().records()
);
});
@ -5983,9 +5983,9 @@ public class GroupMetadataManagerTest {
// Advance clock by rebalance timeout to complete join phase. As long as both members have not
// rejoined, we extend the join phase.
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(10000));
assertEquals(10000, context.timer.timeout("join-group-id").deadlineMs - context.time.milliseconds());
assertEquals(10000, context.timer.timeout("join-group-id").deadlineMs() - context.time.milliseconds());
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(10000));
assertEquals(10000, context.timer.timeout("join-group-id").deadlineMs - context.time.milliseconds());
assertEquals(10000, context.timer.timeout("join-group-id").deadlineMs() - context.time.milliseconds());
assertTrue(group.isInState(PREPARING_REBALANCE));
assertEquals(2, group.numMembers());
@ -6341,7 +6341,7 @@ public class GroupMetadataManagerTest {
assertEquals(Errors.UNKNOWN_SERVER_ERROR, appendGroupMetadataErrorToResponseError(Errors.RECORD_LIST_TOO_LARGE));
assertEquals(Errors.UNKNOWN_SERVER_ERROR, appendGroupMetadataErrorToResponseError(Errors.INVALID_FETCH_SIZE));
assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
assertEquals(Errors.LEADER_NOT_AVAILABLE, appendGroupMetadataErrorToResponseError(Errors.LEADER_NOT_AVAILABLE));
}
@Test
@ -6407,8 +6407,8 @@ public class GroupMetadataManagerTest {
assertEquals(1, timeouts.size());
String memberId = joinResult.joinFuture.get().memberId();
timeouts.forEach(timeout -> {
assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key);
assertEquals(expectedRecords, timeout.result.records());
assertEquals(classicGroupHeartbeatKey("group-id", memberId), timeout.key());
assertEquals(expectedRecords, timeout.result().records());
});
assertEquals(0, group.numMembers());
@ -6643,7 +6643,7 @@ public class GroupMetadataManagerTest {
// Both heartbeats will expire but only the leader is kicked out.
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(10000);
assertEquals(2, timeouts.size());
timeouts.forEach(timeout -> assertEquals(timeout.result, EMPTY_RESULT));
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result()));
assertTrue(duplicateFollowerJoinResult.joinFuture.isDone());
assertTrue(group.isInState(COMPLETING_REBALANCE));
@ -8275,7 +8275,7 @@ public class GroupMetadataManagerTest {
// the follower out because it is awaiting sync.
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(10000);
assertTrue(timeouts.size() <= 2);
timeouts.forEach(timeout -> assertTrue(timeout.result.records().isEmpty()));
timeouts.forEach(timeout -> assertTrue(timeout.result().records().isEmpty()));
assertTrue(followerSyncResult.syncFuture.isDone());
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), followerSyncResult.syncFuture.get().errorCode());
@ -9108,14 +9108,14 @@ public class GroupMetadataManagerTest {
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key);
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertEquals(
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, group.groupAssignment())),
timeout.result.records()
timeout.result().records()
);
// Simulate a successful write to the log.
timeout.result.appendFuture().complete(null);
timeout.result().appendFuture().complete(null);
// Heartbeats fail because none of the members have sent the sync request
joinResponses.forEach(response -> context.verifyHeartbeat(group.groupId(), response, Errors.UNKNOWN_MEMBER_ID));
@ -9162,8 +9162,8 @@ public class GroupMetadataManagerTest {
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key);
assertTrue(timeout.result.records().isEmpty());
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertTrue(timeout.result().records().isEmpty());
// Leader should be able to heartbeat
joinResponses.subList(0, 1).forEach(response -> context.verifyHeartbeat(group.groupId(), response, Errors.REBALANCE_IN_PROGRESS));
@ -9213,8 +9213,8 @@ public class GroupMetadataManagerTest {
List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key);
assertTrue(timeout.result.records().isEmpty());
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertTrue(timeout.result().records().isEmpty());
// Follower sync responses should fail.
followerSyncFutures.forEach(future -> {
@ -12181,7 +12181,7 @@ public class GroupMetadataManagerTest {
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the downgrade.
ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(45000 + 1).get(0);
assertEquals(groupSessionTimeoutKey(groupId, memberId2), timeout.key);
assertEquals(groupSessionTimeoutKey(groupId, memberId2), timeout.key());
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of(
new TopicPartition(fooTopicName, 0),
@ -12236,7 +12236,7 @@ public class GroupMetadataManagerTest {
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments))
),
timeout.result.records()
timeout.result().records()
);
// The new classic member 1 has a heartbeat timeout.
@ -12383,7 +12383,7 @@ public class GroupMetadataManagerTest {
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the downgrade.
ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(30000 + 1).get(0);
assertEquals(groupRebalanceTimeoutKey(groupId, memberId2), timeout.key);
assertEquals(groupRebalanceTimeoutKey(groupId, memberId2), timeout.key());
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of(
new TopicPartition(fooTopicName, 0),
@ -12438,7 +12438,7 @@ public class GroupMetadataManagerTest {
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments))
),
timeout.result.records()
timeout.result().records()
);
// The new classic member 1 has a heartbeat timeout.
@ -14554,7 +14554,7 @@ public class GroupMetadataManagerTest {
// The member is fenced from the group.
assertEquals(1, timeouts.size());
ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(groupSessionTimeoutKey(groupId, memberId), timeout.key);
assertEquals(groupSessionTimeoutKey(groupId, memberId), timeout.key());
assertRecordsEquals(
List.of(
// The member is removed.
@ -14565,7 +14565,7 @@ public class GroupMetadataManagerTest {
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
),
timeout.result.records()
timeout.result().records()
);
}
@ -14619,7 +14619,7 @@ public class GroupMetadataManagerTest {
// The member is fenced from the group.
assertEquals(1, timeouts.size());
ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key);
assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key());
assertRecordsEquals(
List.of(
// The member is removed.
@ -14630,7 +14630,7 @@ public class GroupMetadataManagerTest {
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
),
timeout.result.records()
timeout.result().records()
);
}
@ -15632,7 +15632,7 @@ public class GroupMetadataManagerTest {
topicId,
Map.entry(topicName, new LinkedHashSet<>(partitions))
);
};
}
@Test
public void testShareGroupLeavingMemberBumpsGroupEpoch() {
@ -16062,7 +16062,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())));
assertEquals(e1.getMessage(), "Subtopology subtopologyMissing does not exist in the topology.");
assertEquals("Subtopology subtopologyMissing does not exist in the topology.", e1.getMessage());
InvalidRequestException e2 = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
@ -16076,7 +16076,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())));
assertEquals(e2.getMessage(), "Task 3 for subtopology subtopology1 is invalid. Number of tasks for this subtopology: 3");
assertEquals("Task 3 for subtopology subtopology1 is invalid. Number of tasks for this subtopology: 3", e2.getMessage());
}
@Test
@ -18298,7 +18298,7 @@ public class GroupMetadataManagerTest {
// Execute the scheduled revocation timeout captured earlier to simulate a
// stale timeout. This should be a no-op.
assertEquals(List.of(), scheduledTimeout.operation.generateRecords().records());
assertEquals(List.of(), scheduledTimeout.operation().generateRecords().records());
}
@Test
@ -20424,7 +20424,7 @@ public class GroupMetadataManagerTest {
assertEquals(1, tasks.size());
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertEquals(groupId + "-regex", task.key());
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
@ -20443,7 +20443,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
)))
),
task.result.records()
task.result().records()
);
}
@ -20564,8 +20564,8 @@ public class GroupMetadataManagerTest {
// The pending task was a no-op.
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertRecordsEquals(List.of(), task.result.records());
assertEquals(groupId + "-regex", task.key());
assertRecordsEquals(List.of(), task.result().records());
// The member heartbeats again. It triggers a new resolution.
result = context.consumerGroupHeartbeat(
@ -20591,7 +20591,7 @@ public class GroupMetadataManagerTest {
assertEquals(1, tasks.size());
task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertEquals(groupId + "-regex", task.key());
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
@ -20610,7 +20610,7 @@ public class GroupMetadataManagerTest {
barTopicName, computeTopicHash(barTopicName, metadataImage)
)))
),
task.result.records()
task.result().records()
);
}
@ -20721,7 +20721,7 @@ public class GroupMetadataManagerTest {
// Execute pending tasks.
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task = tasks.get(0);
assertEquals(groupId + "-regex", task.key);
assertEquals(groupId + "-regex", task.key());
assertUnorderedRecordsEquals(
List.of(
@ -20751,7 +20751,7 @@ public class GroupMetadataManagerTest {
foooTopicName, computeTopicHash(foooTopicName, new KRaftCoordinatorMetadataImage(newImage))
))))
),
task.result.records()
task.result().records()
);
}
@ -20979,7 +20979,7 @@ public class GroupMetadataManagerTest {
barTopicName, barTopicHash
)))
),
context.processTasks().get(0).result.records()
context.processTasks().get(0).result().records()
);
}
@ -21206,7 +21206,7 @@ public class GroupMetadataManagerTest {
barTopicName, barTopicHash
)))
),
context.processTasks().get(0).result.records()
context.processTasks().get(0).result().records()
);
}

View File

@ -200,7 +200,7 @@ public class GroupMetadataManagerTestContext {
public static void assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result));
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result()));
}
public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toProtocols(String... protocolNames) {
@ -764,8 +764,8 @@ public class GroupMetadataManagerTestContext {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll();
timeouts.forEach(timeout -> {
if (timeout.result.replayRecords()) {
timeout.result.records().forEach(this::replay);
if (timeout.result().replayRecords()) {
timeout.result().records().forEach(this::replay);
}
});
return timeouts;
@ -774,8 +774,8 @@ public class GroupMetadataManagerTestContext {
public List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> processTasks() {
List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> results = executor.poll();
results.forEach(taskResult -> {
if (taskResult.result.replayRecords()) {
taskResult.result.records().forEach(this::replay);
if (taskResult.result().replayRecords()) {
taskResult.result().records().forEach(this::replay);
}
});
return results;
@ -789,7 +789,7 @@ public class GroupMetadataManagerTestContext {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(groupSessionTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
}
public void assertNoSessionTimeout(
@ -809,7 +809,7 @@ public class GroupMetadataManagerTestContext {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
return timeout;
}
@ -830,7 +830,7 @@ public class GroupMetadataManagerTestContext {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(consumerGroupJoinKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
return timeout;
}
@ -851,7 +851,7 @@ public class GroupMetadataManagerTestContext {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(consumerGroupSyncKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
return timeout;
}
@ -1324,12 +1324,12 @@ public class GroupMetadataManagerTestContext {
));
Set<String> heartbeatKeys = timeouts.stream().map(timeout -> timeout.key).collect(Collectors.toSet());
Set<String> heartbeatKeys = timeouts.stream().map(timeout -> timeout.key()).collect(Collectors.toSet());
assertEquals(expectedHeartbeatKeys, heartbeatKeys);
// Only the last member leaving the group should result in the empty group metadata record.
int timeoutsSize = timeouts.size();
assertEquals(expectedRecords, timeouts.get(timeoutsSize - 1).result.records());
assertEquals(expectedRecords, timeouts.get(timeoutsSize - 1).result().records());
assertNoOrEmptyResult(timeouts.subList(0, timeoutsSize - 1));
assertTrue(group.isInState(EMPTY));
assertEquals(0, group.numMembers());

View File

@ -199,20 +199,11 @@ public class OffsetMetadataManagerTest {
Group.GroupType groupType,
String groupId
) {
switch (groupType) {
case CLASSIC:
return groupMetadataManager.getOrMaybeCreateClassicGroup(
groupId,
true
);
case CONSUMER:
return groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
groupId,
true
);
default:
throw new IllegalArgumentException("Invalid group type: " + groupType);
}
return switch (groupType) {
case CLASSIC -> groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true);
case CONSUMER -> groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
default -> throw new IllegalArgumentException("Invalid group type: " + groupType);
};
}
public void commit() {
@ -391,8 +382,8 @@ public class OffsetMetadataManagerTest {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll();
timeouts.forEach(timeout -> {
if (timeout.result.replayRecords()) {
timeout.result.records().forEach(this::replay);
if (timeout.result().replayRecords()) {
timeout.result().records().forEach(this::replay);
}
});
return timeouts;

View File

@ -127,9 +127,9 @@ public class RangeSetTest {
assertEquals(rangeSet1, rangeSet2);
assertNotEquals(rangeSet1, rangeSet3);
assertEquals(rangeSet1, set);
assertEquals(rangeSet3, hashSet);
assertNotEquals(rangeSet1, new Object());
assertEquals(set, rangeSet1);
assertEquals(hashSet, rangeSet3);
assertNotEquals(new Object(), rangeSet1);
// Empty sets are equal.
RangeSet emptyRangeSet1 = new RangeSet(0, 0);

View File

@ -560,8 +560,8 @@ public class ClassicGroupTest {
int newSessionTimeoutMs = 20000;
group.updateMember(member, newProtocols, newRebalanceTimeoutMs, newSessionTimeoutMs, null);
assertEquals(group.rebalanceTimeoutMs(), newRebalanceTimeoutMs);
assertEquals(member.sessionTimeoutMs(), newSessionTimeoutMs);
assertEquals(newRebalanceTimeoutMs, group.rebalanceTimeoutMs());
assertEquals(newSessionTimeoutMs, member.sessionTimeoutMs());
assertEquals(newProtocols, member.supportedProtocols());
}

View File

@ -194,12 +194,12 @@ public class TargetAssignmentBuilderTest {
ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex);
if (resolvedRegularExpression != null) {
if (subscriptions.isEmpty()) {
subscriptions = resolvedRegularExpression.topics;
} else if (!resolvedRegularExpression.topics.isEmpty()) {
subscriptions = resolvedRegularExpression.topics();
} else if (!resolvedRegularExpression.topics().isEmpty()) {
// We only use a UnionSet when the member uses both type of subscriptions. The
// protocol allows it. However, the Apache Kafka Consumer does not support it.
// Other clients such as librdkafka may support it.
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics);
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics());
}
}
}

View File

@ -501,13 +501,13 @@ public class ConsumerGroupTest {
@Test
public void testGroupTypeFromString() {
assertEquals(Group.GroupType.parse("classic"), Group.GroupType.CLASSIC);
assertEquals(Group.GroupType.CLASSIC, Group.GroupType.parse("classic"));
// Test case insensitivity.
assertEquals(Group.GroupType.parse("Consumer"), Group.GroupType.CONSUMER);
assertEquals(Group.GroupType.CONSUMER, Group.GroupType.parse("Consumer"));
// Test with invalid group type.
assertEquals(Group.GroupType.parse("Invalid"), Group.GroupType.UNKNOWN);
assertEquals(Group.GroupType.UNKNOWN, Group.GroupType.parse("Invalid"));
}
@Test

View File

@ -32,9 +32,9 @@ public class ResolvedRegularExpressionTest {
12345L
);
assertEquals(Set.of("foo", "bar"), resolvedRegularExpression.topics);
assertEquals(10L, resolvedRegularExpression.version);
assertEquals(12345L, resolvedRegularExpression.timestamp);
assertEquals(Set.of("foo", "bar"), resolvedRegularExpression.topics());
assertEquals(10L, resolvedRegularExpression.version());
assertEquals(12345L, resolvedRegularExpression.timestamp());
}
@Test

View File

@ -136,10 +136,10 @@ public class ShareGroupTest {
@Test
public void testGroupTypeFromString() {
assertEquals(Group.GroupType.parse("share"), Group.GroupType.SHARE);
assertEquals(Group.GroupType.SHARE, Group.GroupType.parse("share"));
// Test case insensitivity.
assertEquals(Group.GroupType.parse("Share"), Group.GroupType.SHARE);
assertEquals(Group.GroupType.parse("SHare"), Group.GroupType.SHARE);
assertEquals(Group.GroupType.SHARE, Group.GroupType.parse("Share"));
assertEquals(Group.GroupType.SHARE, Group.GroupType.parse("SHare"));
}
@Test

View File

@ -987,21 +987,7 @@ public class StickyTaskAssignorTest {
Map.of());
}
static class TopologyDescriberImpl implements TopologyDescriber {
final int numTasks;
final boolean isStateful;
final List<String> subtopologies;
TopologyDescriberImpl(int numTasks, boolean isStateful, List<String> subtopologies) {
this.numTasks = numTasks;
this.isStateful = isStateful;
this.subtopologies = subtopologies;
}
@Override
public List<String> subtopologies() {
return subtopologies;
}
record TopologyDescriberImpl(int numTasks, boolean isStateful, List<String> subtopologies) implements TopologyDescriber {
@Override
public int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException {

View File

@ -100,7 +100,7 @@ public class ConfiguredTopologyTest {
Optional.empty()
)
);
assertEquals(ex.getMessage(), "Subtopologies must be present if topicConfigurationException is empty.");
assertEquals("Subtopologies must be present if topicConfigurationException is empty.", ex.getMessage());
}
@Test