KAFKA-18559 Cleanup FinalizedFeatures (#18593)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-24 19:39:01 +08:00 committed by GitHub
parent 356f0d815c
commit fa2df3bca7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 17 additions and 44 deletions

View File

@ -540,8 +540,7 @@ class KRaftMetadataCache(
} }
new FinalizedFeatures(image.features().metadataVersion(), new FinalizedFeatures(image.features().metadataVersion(),
finalizedFeatures, finalizedFeatures,
image.highestOffsetAndEpoch().offset, image.highestOffsetAndEpoch().offset)
true)
} }
} }

View File

@ -85,9 +85,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0, 0)
true
)
} }
when(metadataCache.metadataVersion()) when(metadataCache.metadataVersion())

View File

@ -70,9 +70,7 @@ class TransactionStateManagerTest {
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0, 0)
true
)
} }
val metrics = new Metrics() val metrics = new Metrics()
@ -1332,9 +1330,7 @@ class TransactionStateManagerTest {
new FinalizedFeatures( new FinalizedFeatures(
MetadataVersion.latestTesting(), MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
0, 0)
true
)
} }
val transactionManager = new TransactionStateManager(0, scheduler, val transactionManager = new TransactionStateManager(0, scheduler,
replicaManager, metadataCache, txnConfig, time, metrics) replicaManager, metadataCache, txnConfig, time, metrics)

View File

@ -37,7 +37,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader( val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0)) new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException], val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception") "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
@ -55,7 +55,7 @@ class ProcessorTest {
.setCorrelationId(0); .setCorrelationId(0);
val requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion)) val requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException], val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"LEADER_AND_ISR should throw InvalidRequestException exception") "LEADER_AND_ISR should throw InvalidRequestException exception")
@ -67,7 +67,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader( val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0)) new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
val e = assertThrows(classOf[UnsupportedVersionException], val e = assertThrows(classOf[UnsupportedVersionException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"PRODUCE v0 should throw UnsupportedVersionException exception") "PRODUCE v0 should throw UnsupportedVersionException exception")

View File

@ -84,7 +84,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics() TestUtils.clearYammerMetrics()
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
var server: SocketServer = _ var server: SocketServer = _
val sockets = new ArrayBuffer[Socket] val sockets = new ArrayBuffer[Socket]

View File

@ -179,7 +179,7 @@ class KafkaApisTest extends Logging {
enabledApis, enabledApis,
BrokerFeatures.defaultSupportedFeatures(true), BrokerFeatures.defaultSupportedFeatures(true),
true, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled) when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
setupFeatures(featureVersions) setupFeatures(featureVersions)
@ -220,9 +220,7 @@ class KafkaApisTest extends Logging {
featureVersions.map { featureVersion => featureVersions.map { featureVersion =>
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short] featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
}.toMap.asJava, }.toMap.asJava,
0, 0)
true
)
} }
case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache") case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache")

View File

@ -57,8 +57,8 @@ public class FeaturesPublisher implements MetadataPublisher {
if (delta.featuresDelta() != null) { if (delta.featuresDelta() != null) {
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersion(), FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersion(),
newImage.features().finalizedVersions(), newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset(), newImage.provenance().lastContainedOffset()
true); );
if (!newFinalizedFeatures.equals(finalizedFeatures)) { if (!newFinalizedFeatures.equals(finalizedFeatures)) {
log.info("Loaded new metadata {}.", newFinalizedFeatures); log.info("Loaded new metadata {}.", newFinalizedFeatures);
finalizedFeatures = newFinalizedFeatures; finalizedFeatures = newFinalizedFeatures;

View File

@ -27,25 +27,18 @@ public final class FinalizedFeatures {
private final long finalizedFeaturesEpoch; private final long finalizedFeaturesEpoch;
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); return new FinalizedFeatures(version, Collections.emptyMap(), -1);
} }
public FinalizedFeatures( public FinalizedFeatures(
MetadataVersion metadataVersion, MetadataVersion metadataVersion,
Map<String, Short> finalizedFeatures, Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch, long finalizedFeaturesEpoch
boolean kraftMode
) { ) {
this.metadataVersion = metadataVersion; this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.finalizedFeatures = new HashMap<>(finalizedFeatures); this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
// In KRaft mode, we always include the metadata version in the features map. this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
// In ZK mode, we never include it.
if (kraftMode) {
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
} else {
this.finalizedFeatures.remove(MetadataVersion.FEATURE_NAME);
}
} }
public MetadataVersion metadataVersion() { public MetadataVersion metadataVersion() {

View File

@ -24,27 +24,16 @@ import java.util.Collections;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
class FinalizedFeaturesTest { class FinalizedFeaturesTest {
@Test @Test
public void testKRaftModeFeatures() { public void testKRaftModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION, FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, true); Collections.singletonMap("foo", (short) 2), 123);
assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME)); finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2, assertEquals((short) 2,
finalizedFeatures.finalizedFeatures().get("foo")); finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(2, finalizedFeatures.finalizedFeatures().size()); assertEquals(2, finalizedFeatures.finalizedFeatures().size());
} }
@Test
public void testZkModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, false);
assertNull(finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(1, finalizedFeatures.finalizedFeatures().size());
}
} }