MINOR: Cleanup metadata module (#20115)

- Removed unused methods and arguments;
- Used enhanced switch and functional style expression for Optional;
- Fixed IDEA code inspection warnings.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Dmitry Werner 2025-07-21 01:51:09 +05:00 committed by GitHub
parent 9a2f202a1e
commit 634e99e9ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 117 additions and 204 deletions

View File

@ -23,14 +23,15 @@ import org.apache.kafka.common.metadata.PartitionRecord;
* A record was encountered where the number of directories does not match the number of replicas.
*/
public class InvalidReplicaDirectoriesException extends InvalidMetadataException {
private static final String ERR_MSG = "The lengths for replicas and directories do not match: ";
private static final long serialVersionUID = 1L;
public InvalidReplicaDirectoriesException(PartitionRecord record) {
super("The lengths for replicas and directories do not match: " + record);
super(ERR_MSG + record);
}
public InvalidReplicaDirectoriesException(PartitionChangeRecord record) {
super("The lengths for replicas and directories do not match: " + record);
super(ERR_MSG + record);
}
}

View File

@ -26,7 +26,7 @@ import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
/**
* The BrokerheartbeatTracker stores the last time each broker sent a heartbeat to us.
* The BrokerHeartbeatTracker stores the last time each broker sent a heartbeat to us.
* This class will be present only on the active controller.
*
* UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM MULTIPLE THREADS.

View File

@ -261,32 +261,32 @@ public class ClientQuotaControlManager {
}
// Ensure the quota value is valid
switch (configKey.type()) {
case DOUBLE:
return ApiError.NONE;
case SHORT:
return switch (configKey.type()) {
case DOUBLE -> ApiError.NONE;
case SHORT -> {
if (value > Short.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a SHORT.");
}
return getErrorForIntegralQuotaValue(value, key);
case INT:
yield getErrorForIntegralQuotaValue(value, key);
}
case INT -> {
if (value > Integer.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for an INT.");
}
return getErrorForIntegralQuotaValue(value, key);
case LONG: {
yield getErrorForIntegralQuotaValue(value, key);
}
case LONG -> {
if (value > Long.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
yield new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a LONG.");
}
return getErrorForIntegralQuotaValue(value, key);
yield getErrorForIntegralQuotaValue(value, key);
}
default:
return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
}
default -> new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
};
}
static ApiError getErrorForIntegralQuotaValue(double value, String key) {

View File

@ -823,7 +823,7 @@ public class ClusterControlManager {
}
Iterator<Entry<Integer, Map<String, VersionRange>>> brokerSupportedFeatures() {
return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
return new Iterator<>() {
private final Iterator<BrokerRegistration> iter = brokerRegistrations.values().iterator();
@Override
@ -845,7 +845,7 @@ public class ClusterControlManager {
throw new UnsupportedVersionException("The current MetadataVersion is too old to " +
"support controller registrations.");
}
return new Iterator<Entry<Integer, Map<String, VersionRange>>>() {
return new Iterator<>() {
private final Iterator<ControllerRegistration> iter = controllerRegistrations.values().iterator();
@Override

View File

@ -364,9 +364,7 @@ public class ConfigurationControlManager {
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
if (alterConfigPolicy.isPresent()) {
alterConfigPolicy.get().validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck));
}
alterConfigPolicy.ifPresent(policy -> policy.validate(new RequestMetadata(configResource, alteredConfigsForAlterConfigPolicyCheck)));
} catch (ConfigException e) {
return new ApiError(INVALID_CONFIG, e.getMessage());
} catch (Throwable e) {

View File

@ -74,12 +74,12 @@ class EventPerformanceMonitor {
/**
* The period in nanoseconds.
*/
private long periodNs;
private final long periodNs;
/**
* The always-log threshold in nanoseconds.
*/
private long alwaysLogThresholdNs;
private final long alwaysLogThresholdNs;
/**
* The name of the slowest event we've seen so far, or null if none has been seen.

View File

@ -36,7 +36,6 @@ interface KRaftVersionAccessor {
* @param epoch the current epoch
* @param newVersion the new kraft version to upgrade to
* @param validateOnly whether to just validate the change and not persist it
* @throws ApiException when the upgrade fails to validate
*/
void upgradeKRaftVersion(int epoch, KRaftVersion newVersion, boolean validateOnly);
}

View File

@ -676,11 +676,6 @@ public final class QuorumController implements Controller {
return clusterControl;
}
// Visible for testing
FeatureControlManager featureControl() {
return featureControl;
}
// Visible for testing
ConfigurationControlManager configurationControl() {
return configurationControl;

View File

@ -1109,7 +1109,6 @@ public class ReplicationControlManager {
topic,
partitionId,
partition,
context.requestHeader().requestApiVersion(),
partitionData);
if (validationError != Errors.NONE) {
@ -1239,7 +1238,6 @@ public class ReplicationControlManager {
TopicControlInfo topic,
int partitionId,
PartitionRegistration partition,
short requestApiVersion,
AlterPartitionRequestData.PartitionData partitionData
) {
if (partition == null) {

View File

@ -167,10 +167,8 @@ public final class EventHandlerExceptionInfo {
bld.append("event unable to start processing because of ");
}
bld.append(internalException.getClass().getSimpleName());
if (externalException.isPresent()) {
bld.append(" (treated as ").
append(externalException.get().getClass().getSimpleName()).append(")");
}
externalException.ifPresent(e -> bld.append(" (treated as ")
.append(e.getClass().getSimpleName()).append(")"));
if (causesFailover()) {
bld.append(" at epoch ").append(epoch);
}

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotaImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.ArrayList;
import java.util.Collections;
@ -47,8 +46,7 @@ public record ClientQuotaImage(Map<String, Double> quotas) {
public void write(
ClientQuotaEntity entity,
ImageWriter writer,
ImageWriterOptions options
ImageWriter writer
) {
for (Entry<String, Double> entry : quotas.entrySet()) {
writer.write(0, new ClientQuotaRecord().

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryDat
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.node.ClientQuotasImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.HashMap;
@ -67,11 +66,11 @@ public final class ClientQuotasImage {
return entities;
}
public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
ClientQuotaEntity entity = entry.getKey();
ClientQuotaImage clientQuotaImage = entry.getValue();
clientQuotaImage.write(entity, writer, options);
clientQuotaImage.write(entity, writer);
}
}

View File

@ -165,9 +165,7 @@ public final class ClusterDelta {
int nodeId = entry.getKey();
Optional<BrokerRegistration> brokerRegistration = entry.getValue();
if (!newBrokers.containsKey(nodeId)) {
if (brokerRegistration.isPresent()) {
newBrokers.put(nodeId, brokerRegistration.get());
}
brokerRegistration.ifPresent(registration -> newBrokers.put(nodeId, registration));
}
}
Map<Integer, ControllerRegistration> newControllers = new HashMap<>(image.controllers().size());
@ -184,9 +182,7 @@ public final class ClusterDelta {
int nodeId = entry.getKey();
Optional<ControllerRegistration> controllerRegistration = entry.getValue();
if (!newControllers.containsKey(nodeId)) {
if (controllerRegistration.isPresent()) {
newControllers.put(nodeId, controllerRegistration.get());
}
controllerRegistration.ifPresent(registration -> newControllers.put(nodeId, registration));
}
}
return new ClusterImage(newBrokers, newControllers);

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.node.ConfigurationImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.Map;
@ -71,8 +70,7 @@ public final class ConfigurationImage {
public void write(
ConfigResource configResource,
ImageWriter writer,
ImageWriterOptions options
ImageWriter writer
) {
for (Map.Entry<String, String> entry : data.entrySet()) {
writer.write(0, new ConfigRecord().

View File

@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.image.node.ConfigurationsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Collections;
import java.util.Map;
@ -74,11 +73,11 @@ public final class ConfigurationsImage {
}
}
public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
for (Entry<ConfigResource, ConfigurationImage> entry : data.entrySet()) {
ConfigResource configResource = entry.getKey();
ConfigurationImage configImage = entry.getValue();
configImage.write(configResource, writer, options);
configImage.write(configResource, writer);
}
}

View File

@ -153,9 +153,9 @@ public final class MetadataImage {
features.write(writer, options);
cluster.write(writer, options);
topics.write(writer, options);
configs.write(writer, options);
clientQuotas.write(writer, options);
producerIds.write(writer, options);
configs.write(writer);
clientQuotas.write(writer);
producerIds.write(writer);
acls.write(writer);
scram.write(writer, options);
delegationTokens.write(writer, options);

View File

@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.image.node.ProducerIdsImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import java.util.Objects;
@ -46,7 +45,7 @@ public final class ProducerIdsImage {
return nextProducerId;
}
public void write(ImageWriter writer, ImageWriterOptions options) {
public void write(ImageWriter writer) {
if (nextProducerId >= 0) {
writer.write(0, new ProducerIdsRecord().
setBrokerId(-1).

View File

@ -109,17 +109,17 @@ public final class ScramImage {
DescribeUserScramCredentialsResult result = new DescribeUserScramCredentialsResult().setUser(user.getKey());
if (!user.getValue()) {
boolean datafound = false;
boolean dataFound = false;
List<CredentialInfo> credentialInfos = new ArrayList<>();
for (Map.Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
Map<String, ScramCredentialData> credentialDataSet = mechanismsEntry.getValue();
if (credentialDataSet.containsKey(user.getKey())) {
credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
datafound = true;
dataFound = true;
}
}
if (datafound) {
if (dataFound) {
result.setCredentialInfos(credentialInfos);
} else {
result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())

View File

@ -20,7 +20,6 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration;
@ -40,8 +39,8 @@ import java.util.stream.Collectors;
public final class TopicDelta {
private final TopicImage image;
private final Map<Integer, PartitionRegistration> partitionChanges = new HashMap<>();
private Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new HashMap<>();
private Map<Integer, Integer> partitionToElrElectionCount = new HashMap<>();
private final Map<Integer, Integer> partitionToUncleanLeaderElectionCount = new HashMap<>();
private final Map<Integer, Integer> partitionToElrElectionCount = new HashMap<>();
public TopicDelta(TopicImage image) {
this.image = image;
@ -113,11 +112,9 @@ public final class TopicDelta {
}
}
public void replay(ClearElrRecord record) {
public void replay() {
// Some partitions are not added to the image yet, let's check the partitionChanges first.
partitionChanges.forEach((partitionId, partition) -> {
maybeClearElr(partitionId, partition);
});
partitionChanges.forEach(this::maybeClearElr);
image.partitions().forEach((partitionId, partition) -> {
if (!partitionChanges.containsKey(partitionId)) {

View File

@ -95,11 +95,11 @@ public final class TopicsDelta {
topicDelta.replay(record);
}
private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
private void maybeReplayClearElrRecord(Uuid topicId) {
// Only apply the record if the topic is not deleted.
if (!deletedTopicIds.contains(topicId)) {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
topicDelta.replay();
}
}
@ -123,15 +123,11 @@ public final class TopicsDelta {
record.topicName() + ": no such topic found.");
}
maybeReplayClearElrRecord(topicId, record);
maybeReplayClearElrRecord(topicId);
} else {
// Update all the existing topics
image.topicsById().forEach((topicId, image) -> {
maybeReplayClearElrRecord(topicId, record);
});
createdTopicIds().forEach((topicId -> {
maybeReplayClearElrRecord(topicId, record);
}));
image.topicsById().forEach((topicId, image) -> maybeReplayClearElrRecord(topicId));
createdTopicIds().forEach((this::maybeReplayClearElrRecord));
}
}

View File

@ -63,14 +63,11 @@ public class ClientQuotasImageNode implements MetadataNode {
String ip = null;
String user = null;
for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
if (entry.getKey().equals(CLIENT_ID)) {
clientId = entry.getValue();
} else if (entry.getKey().equals(IP)) {
ip = entry.getValue();
} else if (entry.getKey().equals(USER)) {
user = entry.getValue();
} else {
throw new RuntimeException("Invalid entity type " + entry.getKey());
switch (entry.getKey()) {
case CLIENT_ID -> clientId = entry.getValue();
case IP -> ip = entry.getValue();
case USER -> user = entry.getValue();
default -> throw new RuntimeException("Invalid entity type " + entry.getKey());
}
}
StringBuilder bld = new StringBuilder();
@ -85,7 +82,6 @@ public class ClientQuotasImageNode implements MetadataNode {
}
if (user != null) {
bld.append(prefix).append("user(").append(escape(user)).append(")");
prefix = "_";
}
return bld.toString();
}

View File

@ -55,28 +55,17 @@ public class KafkaConfigSchema {
* makes sense to put it here.
*/
public static ConfigEntry.ConfigType translateConfigType(ConfigDef.Type type) {
switch (type) {
case BOOLEAN:
return ConfigEntry.ConfigType.BOOLEAN;
case STRING:
return ConfigEntry.ConfigType.STRING;
case INT:
return ConfigEntry.ConfigType.INT;
case SHORT:
return ConfigEntry.ConfigType.SHORT;
case LONG:
return ConfigEntry.ConfigType.LONG;
case DOUBLE:
return ConfigEntry.ConfigType.DOUBLE;
case LIST:
return ConfigEntry.ConfigType.LIST;
case CLASS:
return ConfigEntry.ConfigType.CLASS;
case PASSWORD:
return ConfigEntry.ConfigType.PASSWORD;
default:
return ConfigEntry.ConfigType.UNKNOWN;
}
return switch (type) {
case BOOLEAN -> ConfigEntry.ConfigType.BOOLEAN;
case STRING -> ConfigEntry.ConfigType.STRING;
case INT -> ConfigEntry.ConfigType.INT;
case SHORT -> ConfigEntry.ConfigType.SHORT;
case LONG -> ConfigEntry.ConfigType.LONG;
case DOUBLE -> ConfigEntry.ConfigType.DOUBLE;
case LIST -> ConfigEntry.ConfigType.LIST;
case CLASS -> ConfigEntry.ConfigType.CLASS;
case PASSWORD -> ConfigEntry.ConfigType.PASSWORD;
};
}
private static final Map<ConfigEntry.ConfigSource, DescribeConfigsResponse.ConfigSource> TRANSLATE_CONFIG_SOURCE_MAP;

View File

@ -216,8 +216,8 @@ public class PartitionRegistration {
}
private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int[] removingReplicas,
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
Objects.requireNonNull(directories);
if (directories.length > 0 && directories.length != replicas.length) {
throw new IllegalArgumentException("The lengths for replicas and directories do not match.");
@ -410,8 +410,7 @@ public class PartitionRegistration {
return new ApiMessageAndVersion(record, options.metadataVersion().partitionRecordVersion());
}
public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
boolean isNew) {
public PartitionState toLeaderAndIsrPartitionState(TopicPartition tp, boolean isNew) {
return new PartitionState().
setTopicName(tp.topic()).
setPartitionIndex(tp.partition()).

View File

@ -94,11 +94,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer, Monitorabl
initialLoadFuture.complete(null);
}
// Visible for testing
public CompletableFuture<Void> initialLoadFuture() {
return initialLoadFuture;
}
@Override
public void completeInitialLoad(Exception e) {
if (!initialLoadFuture.isDone()) {

View File

@ -106,8 +106,6 @@ public class StandardAuthorizerData {
*/
private AclCache aclCache;
private static Logger createLogger(int nodeId) {
return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
}
@ -172,7 +170,7 @@ public class StandardAuthorizerData {
}
StandardAuthorizerData copyWithNewAcls(AclCache aclCache) {
StandardAuthorizerData newData = new StandardAuthorizerData(
StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
loadingComplete,

View File

@ -177,7 +177,7 @@ public class BootstrapMetadata {
@Override
public String toString() {
return "BootstrapMetadata(records=" + records.toString() +
return "BootstrapMetadata(records=" + records +
", metadataVersionLevel=" + metadataVersionLevel +
", source=" + source +
")";

View File

@ -37,7 +37,7 @@ public class PartitionAssignment {
public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) {
this.replicas = List.copyOf(replicas);
this.directories = replicas.stream().map(replica -> defaultDirProvider.defaultDir(replica)).toList();
this.directories = replicas.stream().map(defaultDirProvider::defaultDir).toList();
}
/**

View File

@ -238,15 +238,11 @@ public final class MetaProperties {
StringBuilder bld = new StringBuilder();
bld.append("MetaProperties");
bld.append("(version=").append(version.number());
if (clusterId.isPresent()) {
bld.append(", clusterId=").append(clusterId.get());
}
clusterId.ifPresent(id -> bld.append(", clusterId=").append(id));
if (nodeId.isPresent()) {
bld.append(", nodeId=").append(nodeId.getAsInt());
}
if (directoryId.isPresent()) {
bld.append(", directoryId=").append(directoryId.get());
}
directoryId.ifPresent(id -> bld.append(", directoryId=").append(id));
bld.append(")");
return bld.toString();
}
@ -254,9 +250,7 @@ public final class MetaProperties {
public Properties toProperties() {
Properties props = new Properties();
props.setProperty(VERSION_PROP, version.numberString());
if (clusterId.isPresent()) {
props.setProperty(CLUSTER_ID_PROP, clusterId.get());
}
clusterId.ifPresent(id -> props.setProperty(CLUSTER_ID_PROP, id));
if (version.hasBrokerId()) {
if (nodeId.isPresent()) {
props.setProperty(BROKER_ID_PROP, "" + nodeId.getAsInt());
@ -264,9 +258,7 @@ public final class MetaProperties {
} else {
props.setProperty(NODE_ID_PROP, "" + nodeId.getAsInt());
}
if (directoryId.isPresent()) {
props.setProperty(DIRECTORY_ID_PROP, directoryId.get().toString());
}
directoryId.ifPresent(id -> props.setProperty(DIRECTORY_ID_PROP, id.toString()));
return props;
}
}

View File

@ -47,11 +47,11 @@ public enum MetaPropertiesVersion {
}
public static MetaPropertiesVersion fromNumber(int number) {
switch (number) {
case 0: return V0;
case 1: return V1;
default: throw new RuntimeException("Unknown meta.properties version number " + number);
}
return switch (number) {
case 0 -> V0;
case 1 -> V1;
default -> throw new RuntimeException("Unknown meta.properties version number " + number);
};
}
MetaPropertiesVersion(int number) {

View File

@ -461,17 +461,12 @@ public class Formatter {
DYNAMIC_METADATA_VOTER_DIRECTORY;
String description() {
switch (this) {
case LOG_DIRECTORY:
return "data directory";
case STATIC_METADATA_DIRECTORY:
return "metadata directory";
case DYNAMIC_METADATA_NON_VOTER_DIRECTORY:
return "dynamic metadata directory";
case DYNAMIC_METADATA_VOTER_DIRECTORY:
return "dynamic metadata voter directory";
}
throw new RuntimeException("invalid enum type " + this);
return switch (this) {
case LOG_DIRECTORY -> "data directory";
case STATIC_METADATA_DIRECTORY -> "metadata directory";
case DYNAMIC_METADATA_NON_VOTER_DIRECTORY -> "dynamic metadata directory";
case DYNAMIC_METADATA_VOTER_DIRECTORY -> "dynamic metadata voter directory";
};
}
boolean isDynamicMetadataDirectory() {

View File

@ -119,16 +119,12 @@ public class PartitionChangeBuilderTest {
private static final Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) {
switch (version) {
case (short) 0:
return MetadataVersion.IBP_3_7_IV0;
case (short) 1:
return MetadataVersion.IBP_3_7_IV2;
case (short) 2:
return MetadataVersion.IBP_4_0_IV1;
default:
throw new RuntimeException("Unknown PartitionChangeRecord version " + version);
}
return switch (version) {
case (short) 0 -> MetadataVersion.IBP_3_7_IV0;
case (short) 1 -> MetadataVersion.IBP_3_7_IV2;
case (short) 2 -> MetadataVersion.IBP_4_0_IV1;
default -> throw new RuntimeException("Unknown PartitionChangeRecord version " + version);
};
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {

View File

@ -935,7 +935,7 @@ public class ReplicationControlManagerTest {
}
@Override
public void close() throws Exception { /* Nothing to do */ }
public void close() { /* Nothing to do */ }
@Override
public void configure(Map<String, ?> configs) { /* Nothing to do */ }
@ -2990,7 +2990,7 @@ public class ReplicationControlManagerTest {
new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
KRaftClusterDescriber describer = replication.clusterDescriber;
HashSet<UsableBroker> brokers = new HashSet<>();
describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
describer.usableBrokers().forEachRemaining(brokers::add);
assertEquals(Set.of(
new UsableBroker(0, Optional.empty(), true),
new UsableBroker(1, Optional.empty(), true),

View File

@ -62,20 +62,13 @@ public class ControllerMetricsTestUtils {
}
public static PartitionRegistration fakePartitionRegistration(
FakePartitionRegistrationType type
FakePartitionRegistrationType type
) {
int leader = 0;
switch (type) {
case NORMAL:
leader = 0;
break;
case NON_PREFERRED_LEADER:
leader = 1;
break;
case OFFLINE:
leader = -1;
break;
}
int leader = switch (type) {
case NORMAL -> 0;
case NON_PREFERRED_LEADER -> 1;
case OFFLINE -> -1;
};
return new PartitionRegistration.Builder().
setReplicas(new int[] {0, 1, 2}).
setDirectories(DirectoryId.migratingArray(3)).

View File

@ -20,11 +20,9 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.QuotaConfig;
import org.junit.jupiter.api.Test;
@ -136,7 +134,7 @@ public class ClientQuotasImageTest {
private static List<ApiMessageAndVersion> getImageRecords(ClientQuotasImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
image.write(writer);
return writer.records();
}
}

View File

@ -19,11 +19,9 @@ package org.apache.kafka.image;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -136,7 +134,7 @@ public class ConfigurationsImageTest {
private static List<ApiMessageAndVersion> getImageRecords(ConfigurationsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
image.write(writer);
return writer.records();
}
}

View File

@ -18,11 +18,9 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -113,7 +111,7 @@ public class ProducerIdsImageTest {
private static List<ApiMessageAndVersion> getImageRecords(ProducerIdsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build());
image.write(writer);
return writer.records();
}
}

View File

@ -41,7 +41,9 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@ -75,10 +77,10 @@ public class ImageWriterOptionsTest {
MetadataVersion version = MetadataVersion.MINIMUM_VERSION;
ImageWriterOptions options = new ImageWriterOptions.Builder(version).
setEligibleLeaderReplicasEnabled(true).build();
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
assertTrue(options.isEligibleLeaderReplicasEnabled());
options = new ImageWriterOptions.Builder(version).build();
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
assertFalse(options.isEligibleLeaderReplicasEnabled());
}
@ParameterizedTest
@ -111,9 +113,9 @@ public class ImageWriterOptionsTest {
ImageWriterOptions options = new ImageWriterOptions.Builder(metadataImage).build();
assertEquals(MetadataVersion.IBP_4_0_IV1, options.metadataVersion());
if (isElrEnabled) {
assertEquals(true, options.isEligibleLeaderReplicasEnabled());
assertTrue(options.isEligibleLeaderReplicasEnabled());
} else {
assertEquals(false, options.isEligibleLeaderReplicasEnabled());
assertFalse(options.isEligibleLeaderReplicasEnabled());
}
}
}

View File

@ -131,7 +131,6 @@ public class BootstrapMetadataTest {
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux");
assertEquals("No MetadataVersion with feature level 1. Valid feature levels are from " + MetadataVersion.MINIMUM_VERSION.featureLevel()
+ " to " + MetadataVersion.latestTesting().featureLevel() + ".",
assertThrows(RuntimeException.class,
() -> bootstrapMetadata.metadataVersion()).getMessage());
assertThrows(RuntimeException.class, bootstrapMetadata::metadataVersion).getMessage());
}
}

View File

@ -88,15 +88,11 @@ public final class MetaPropertiesTest {
assertEquals(directoryId, metaProperties.directoryId());
Properties props = new Properties();
props.setProperty("version", "0");
if (clusterId.isPresent()) {
props.setProperty("cluster.id", clusterId.get());
}
clusterId.ifPresent(id -> props.setProperty("cluster.id", id));
if (nodeId.isPresent()) {
props.setProperty("broker.id", "" + nodeId.getAsInt());
}
if (directoryId.isPresent()) {
props.setProperty("directory.id", directoryId.get().toString());
}
directoryId.ifPresent(id -> props.setProperty("directory.id", id.toString()));
Properties props2 = metaProperties.toProperties();
assertEquals(props, props2);
MetaProperties metaProperties2 = new MetaProperties.Builder(props2).build();
@ -151,9 +147,7 @@ public final class MetaPropertiesTest {
props.setProperty("version", "1");
props.setProperty("cluster.id", clusterId);
props.setProperty("node.id", "" + nodeId);
if (directoryId.isPresent()) {
props.setProperty("directory.id", directoryId.get().toString());
}
directoryId.ifPresent(id -> props.setProperty("directory.id", id.toString()));
Properties props2 = metaProperties.toProperties();
assertEquals(props, props2);
MetaProperties metaProperties2 = new MetaProperties.Builder(props2).build();

View File

@ -186,7 +186,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
/**
* The initial max read offset which LocalLog instances will be configured with.
*/
private long initialMaxReadOffset = Long.MAX_VALUE;
private final long initialMaxReadOffset = Long.MAX_VALUE;
public SharedLogData(Optional<RawSnapshotReader> snapshot) {
if (snapshot.isPresent()) {
@ -544,7 +544,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
numEntriesFound++;
}
}
log.trace("Completed log check for node " + nodeId);
log.trace("Completed log check for node {}", nodeId);
} catch (Exception e) {
log.error("Exception while handling log check", e);
}