From 6abe1ff51a10ebd08ca03d16713903fa900c7a2c Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 12 Apr 2025 09:07:07 +0900 Subject: [PATCH] MINOR: Fix kraft ver broken test (#19448) Fix failed `kafka.server.KRaftClusterTest."testDescribeKRaftVersion(boolean)` test. It is failing in [3.9 branch](https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.9/186/#showFailuresLink). In the [patch for 4.0](https://github.com/apache/kafka/pull/19127/files#diff-a95d286b4e1eb166af89ea45bf1fe14cd8c944d7fc0483bfd4eff2245d1d2bbbR1014), we created TestKitNodes like this: ``` new TestKitNodes.Builder(). setNumBrokerNodes(1). setNumControllerNodes(1). setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build() ``` But in the 3.9, because we don't have `setFeature` method in TestKitNodes, we removed it which causes the test failure. Added them back to 3.9 branch. Also in 4.0, we include this [patch](https://github.com/apache/kafka/pull/17582/files#r1823404214), which contains a bug fix and TestKitNodes improvement to allow set features. Added them in 3.9 branch to fix the test and the finalized version always 0 issue. Reviewers: PoAn Yang , TengYao Chi --- .../server/metadata/KRaftMetadataCache.scala | 5 +++- .../test/java/kafka/testkit/TestKitNodes.java | 5 ++++ .../kafka/server/KRaftClusterTest.scala | 3 ++- .../metadata/bootstrap/BootstrapMetadata.java | 26 +++++++++++++++++++ 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 0a8d06a3191..5d74002d4c3 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -550,7 +550,10 @@ class KRaftMetadataCache( override def features(): FinalizedFeatures = { val image = _currentImage val finalizedFeatures = new java.util.HashMap[String, java.lang.Short](image.features().finalizedVersions()) - finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionSupplier.get().featureLevel()) + val kraftVersionLevel = kraftVersionSupplier.get().featureLevel() + if (kraftVersionLevel > 0) { + finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel) + } new FinalizedFeatures(image.features().metadataVersion(), finalizedFeatures, diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index 1e12f25c08c..23c7fb7b022 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -68,6 +68,11 @@ public class TestKitNodes { return this; } + public Builder setFeature(String featureName, short level) { + this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level); + return this; + } + public Builder setNumControllerNodes(int numControllerNodes) { this.numControllerNodes = numControllerNodes; return this; diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 63d1b186e0b..695d5455bd0 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1053,7 +1053,8 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setNumControllerNodes(1).build()).build() + setNumControllerNodes(1). + setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build() try { cluster.format() cluster.startup() diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java index 1de6a1f0ee5..4e0606814e3 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java @@ -149,6 +149,32 @@ public class BootstrapMetadata { metadataVersion, source); } + public BootstrapMetadata copyWithFeatureRecord(String featureName, short level) { + List newRecords = new ArrayList<>(); + int i = 0; + while (i < records.size()) { + if (records.get(i).message() instanceof FeatureLevelRecord) { + FeatureLevelRecord record = (FeatureLevelRecord) records.get(i).message(); + if (record.name().equals(featureName)) { + FeatureLevelRecord newRecord = record.duplicate(); + newRecord.setFeatureLevel(level); + newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0)); + break; + } else { + newRecords.add(records.get(i)); + } + } + i++; + } + if (i == records.size()) { + FeatureLevelRecord newRecord = new FeatureLevelRecord(). + setName(featureName). + setFeatureLevel(level); + newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0)); + } + return BootstrapMetadata.fromRecords(newRecords, source); + } + @Override public int hashCode() { return Objects.hash(records, metadataVersion, source);