KAFKA-19192; Old bootstrap checkpoint files cause problems updated servers (#19545)

Old bootstrap.metadata files cause problems with server that include
KAFKA-18601. When the server tries to read the bootstrap.checkpoint
file, it will fail if the metadata.version is older than 3.3-IV3
(feature level 7). This causes problems when these clusters are
upgraded.

This PR makes it possible to represent older MVs in BootstrapMetadata
objects without causing an exception. An exception is thrown only if we
attempt to access the BootstrapMetadata. This ensures that only the code
path in which we start with an empty metadata log checks that the
metadata version is 7 or newer.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Ismael Juma
 <ismael@juma.me.uk>, PoAn Yang <payang@apache.org>, Liu Zeyu
 <zeyu.luke@gmail.com>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
Colin Patrick McCabe 2025-04-24 12:43:35 -07:00 committed by GitHub
parent a948537704
commit 22b89b6413
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 28 deletions

View File

@ -64,6 +64,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Seq, mutable}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
import scala.util.Using
@Timeout(120)
@Tag("integration")
@ -1619,6 +1620,51 @@ class KRaftClusterTest {
}
}
/**
* Test that once a cluster is formatted, a bootstrap.metadata file that contains an unsupported
* MetadataVersion is not a problem. This is a regression test for KAFKA-19192.
*/
@Test
def testOldBootstrapMetadataFile(): Unit = {
val baseDirectory = TestUtils.tempDir().toPath()
Using.resource(new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
setBaseDirectory(baseDirectory).
build()).
setDeleteOnClose(false).
build()
) { cluster =>
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
}
val oldBootstrapMetadata = BootstrapMetadata.fromRecords(
util.Arrays.asList(
new ApiMessageAndVersion(
new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(1),
0.toShort)
),
"oldBootstrapMetadata")
// Re-create the cluster using the same directory structure as above.
// Since we do not need to use the bootstrap metadata, the fact that
// it specifies an obsolete metadata.version should not be a problem.
Using.resource(new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
setBaseDirectory(baseDirectory).
setBootstrapMetadata(oldBootstrapMetadata).
build()).build()
) { cluster =>
cluster.startup()
cluster.waitForReadyBrokers()
}
}
@Test
def testIncreaseNumIoThreads(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(

View File

@ -36,7 +36,7 @@ import java.util.Optional;
*/
public class BootstrapMetadata {
private final List<ApiMessageAndVersion> records;
private final MetadataVersion metadataVersion;
private final short metadataVersionLevel;
private final String source;
public static BootstrapMetadata fromVersions(
@ -66,7 +66,7 @@ public class BootstrapMetadata {
setFeatureLevel(level), (short) 0));
}
}
return new BootstrapMetadata(records, metadataVersion, source);
return new BootstrapMetadata(records, metadataVersion.featureLevel(), source);
}
public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) {
@ -74,28 +74,28 @@ public class BootstrapMetadata {
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
return new BootstrapMetadata(records, metadataVersion, source);
return new BootstrapMetadata(records, metadataVersion.featureLevel(), source);
}
public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) {
MetadataVersion metadataVersion = null;
Optional<Short> metadataVersionLevel = Optional.empty();
for (ApiMessageAndVersion record : records) {
Optional<MetadataVersion> version = recordToMetadataVersion(record.message());
if (version.isPresent()) {
metadataVersion = version.get();
Optional<Short> level = recordToMetadataVersionLevel(record.message());
if (level.isPresent()) {
metadataVersionLevel = level;
}
}
if (metadataVersion == null) {
if (metadataVersionLevel.isEmpty()) {
throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME +
" was found in the bootstrap metadata from " + source);
}
return new BootstrapMetadata(records, metadataVersion, source);
return new BootstrapMetadata(records, metadataVersionLevel.get(), source);
}
public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) {
public static Optional<Short> recordToMetadataVersionLevel(ApiMessage record) {
if (record instanceof FeatureLevelRecord featureLevel) {
if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel()));
return Optional.of(featureLevel.featureLevel());
}
}
return Optional.empty();
@ -103,11 +103,11 @@ public class BootstrapMetadata {
BootstrapMetadata(
List<ApiMessageAndVersion> records,
MetadataVersion metadataVersion,
short metadataVersionLevel,
String source
) {
this.records = Objects.requireNonNull(records);
this.metadataVersion = metadataVersion;
this.metadataVersionLevel = metadataVersionLevel;
Objects.requireNonNull(source);
this.source = source;
}
@ -117,7 +117,7 @@ public class BootstrapMetadata {
}
public MetadataVersion metadataVersion() {
return metadataVersion;
return MetadataVersion.fromFeatureLevel(metadataVersionLevel);
}
public String source() {
@ -163,7 +163,7 @@ public class BootstrapMetadata {
@Override
public int hashCode() {
return Objects.hash(records, metadataVersion, source);
return Objects.hash(records, metadataVersionLevel, source);
}
@Override
@ -171,14 +171,14 @@ public class BootstrapMetadata {
if (o == null || !o.getClass().equals(this.getClass())) return false;
BootstrapMetadata other = (BootstrapMetadata) o;
return Objects.equals(records, other.records) &&
metadataVersion.equals(other.metadataVersion) &&
metadataVersionLevel == other.metadataVersionLevel &&
source.equals(other.source);
}
@Override
public String toString() {
return "BootstrapMetadata(records=" + records.toString() +
", metadataVersion=" + metadataVersion +
", metadataVersionLevel=" + metadataVersionLevel +
", source=" + source +
")";
}

View File

@ -51,13 +51,13 @@ public class BootstrapMetadataTest {
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(FEATURE_NAME).
setFeatureLevel((short) 7), (short) 0)),
IBP_3_3_IV3, "foo"),
IBP_3_3_IV3.featureLevel(), "foo"),
BootstrapMetadata.fromVersion(IBP_3_3_IV3, "foo"));
}
@Test
public void testFromRecordsList() {
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV3, "bar"),
assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV3.featureLevel(), "bar"),
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
}
@ -128,10 +128,10 @@ public class BootstrapMetadataTest {
@Test
public void testFromRecordsListWithOldMetadataVersion() {
RuntimeException exception = assertThrows(RuntimeException.class,
() -> BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux");
assertEquals("No MetadataVersion with feature level 1. Valid feature levels are from " + MetadataVersion.MINIMUM_VERSION.featureLevel()
+ " to " + MetadataVersion.latestTesting().featureLevel() + ".",
exception.getMessage());
+ " to " + MetadataVersion.latestTesting().featureLevel() + ".",
assertThrows(RuntimeException.class,
() -> bootstrapMetadata.metadataVersion()).getMessage());
}
}

View File

@ -115,6 +115,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
private boolean deleteOnClose;
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
@ -122,6 +123,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.controllerListenerName = nodes.controllerListenerName().value();
this.brokerSecurityProtocol = nodes.brokerListenerProtocol().name;
this.controllerSecurityProtocol = nodes.controllerListenerProtocol().name;
this.deleteOnClose = true;
}
public Builder setConfigProp(String key, Object value) {
@ -229,6 +231,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
return Optional.empty();
}
public Builder setDeleteOnClose(boolean deleteOnClose) {
this.deleteOnClose = deleteOnClose;
return this;
}
public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
@ -316,7 +323,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
baseDirectory,
faultHandlerFactory,
socketFactoryManager,
jaasFile);
jaasFile,
deleteOnClose);
}
private String listeners(int node) {
@ -361,6 +369,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
private final boolean deleteOnClose;
private KafkaClusterTestKit(
TestKitNodes nodes,
@ -369,7 +378,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
File baseDirectory,
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile
Optional<File> jaasFile,
boolean deleteOnClose
) {
/*
Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
@ -386,6 +396,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
this.deleteOnClose = deleteOnClose;
}
public void format() throws Exception {
@ -645,9 +656,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
waitForAllFutures(futureEntries);
futureEntries.clear();
Utils.delete(baseDirectory);
if (jaasFile.isPresent()) {
Utils.delete(jaasFile.get());
if (deleteOnClose) {
Utils.delete(baseDirectory);
if (jaasFile.isPresent()) {
Utils.delete(jaasFile.get());
}
}
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {