KAFKA-18648: Add back support for metadata version 0-3 (#18716)

During testing, we identified that kafka-python (and aiokafka) relies on metadata request v0 and
hence we need to add these back to comply with the premise of KIP-896 - i.e. it should not
break the clients listed within it.

I reverted the changes from #18218 related to the removal of metadata versions 0-3.

I will submit a separate PR to undeprecate these API versions on the relevant 3.x branches.

kafka-python (and aiokafka) work correctly (produce & consume) with this change on
top of the 4.0 branch.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Ismael Juma 2025-01-28 18:35:33 -08:00 committed by GitHub
parent f18457f2b8
commit e6d72c9e60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 75 additions and 29 deletions

View File

@ -18,14 +18,13 @@
"type": "request",
"listeners": ["broker"],
"name": "MetadataRequest",
"validVersions": "4-13",
"validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
// Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline.
//
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
// higher, an empty array indicates "request metadata for no topics," and a null array is used to
// indicate "request metadata for all topics."
//
// Version 2 and 3 are the same as version 1.
//
// Version 4 adds AllowAutoTopicCreation.

View File

@ -17,11 +17,10 @@
"apiKey": 3,
"type": "response",
"name": "MetadataResponse",
// Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new baseline.
// Version 1 adds fields for the rack of each broker, the controller id, and whether or not the topic is internal.
//
// Version 1 adds fields for the rack of each broker, the controller id, and
// whether or not the topic is internal.
// Version 2 adds the cluster ID field.
//
// Version 3 adds the throttle time.
//
// Version 4 is the same as version 3.
@ -43,7 +42,7 @@
// by the DescribeCluster API (KIP-700).
// Version 12 supports topicId.
// Version 13 supports top-level error code in the response.
"validVersions": "4-13",
"validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,

View File

@ -1127,6 +1127,8 @@ public final class MessageTest {
for (short version : ApiKeys.CREATE_TOPICS.allVersions()) {
verifyWriteRaisesNpe(version, createTopics);
}
MetadataRequestData metadata = new MetadataRequestData().setTopics(null);
verifyWriteRaisesNpe((short) 0, metadata);
}
@Test

View File

@ -30,13 +30,23 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MetadataRequestTest {
@Test
public void testEmptyMeansEmptyForAllVersions() {
for (int i = ApiKeys.METADATA.oldestVersion(); i < MetadataRequestData.SCHEMAS.length; i++) {
public void testEmptyMeansAllTopicsV0() {
MetadataRequestData data = new MetadataRequestData();
MetadataRequest parsedRequest = new MetadataRequest(data, (short) 0);
assertTrue(parsedRequest.isAllTopics());
assertNull(parsedRequest.topics());
}
@Test
public void testEmptyMeansEmptyForVersionsAboveV0() {
for (int i = 1; i < MetadataRequestData.SCHEMAS.length; i++) {
MetadataRequestData data = new MetadataRequestData();
data.setAllowAutoTopicCreation(true);
MetadataRequest parsedRequest = new MetadataRequest(data, (short) i);

View File

@ -2297,7 +2297,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(quorum: String): Unit = {
removeAllClientAcls()
for (version <- ApiKeys.METADATA.oldestVersion to ApiKeys.METADATA.latestVersion) {
// MetadataRequest versions older than 1 are not supported.
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
}
}
@ -2317,7 +2318,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val expectedClusterAuthorizedOperations = Utils.to32BitField(
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
for (version <- ApiKeys.METADATA.oldestVersion to ApiKeys.METADATA.latestVersion) {
// MetadataRequest versions older than 1 are not supported.
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations)
}
}

View File

@ -20,6 +20,7 @@ package kafka.server
import java.util.Optional
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@ -40,6 +41,14 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdWithRequestVersion1(quorum: String): Unit = {
val v1MetadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
val v1ClusterId = v1MetadataResponse.clusterId
assertNull(v1ClusterId, s"v1 clusterId should be null")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdIsValid(quorum: String): Unit = {
@ -96,17 +105,27 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
def testAutoTopicCreation(quorum: String): Unit = {
val topic1 = "t1"
val topic2 = "t2"
val topic3 = "t4"
val topic4 = "t5"
val topic3 = "t3"
val topic4 = "t4"
val topic5 = "t5"
createTopic(topic1)
val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
assertNull(response1.errors.get(topic1))
checkAutoCreatedTopic(topic2, response1)
val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response2.errors.get(topic3))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response2.errors.get(topic4))
// The default behavior in old versions of the metadata API is to allow topic creation, so
// protocol downgrades should happen gracefully when auto-creation is explicitly requested.
val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
checkAutoCreatedTopic(topic3, response2)
// V3 doesn't support a configurable allowAutoTopicCreation, so disabling auto-creation is not supported
assertThrows(classOf[UnsupportedVersionException], () => sendMetadataRequest(new MetadataRequest(requestData(List(topic4), allowAutoTopicCreation = false), 3.toShort)))
// V4 and higher support a configurable allowAutoTopicCreation
val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic5))
}
@ParameterizedTest
@ -132,10 +151,15 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
// v4, Null represents all topics
val metadataResponseV1 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
assertTrue(metadataResponseV1.errors.isEmpty, "V4 Response should have no errors")
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V4 Response should have 2 (all) topics")
// v0, Empty list represents all topics
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(requestData(List(), allowAutoTopicCreation = true), 0.toShort))
assertTrue(metadataResponseV0.errors.isEmpty, "V0 Response should have no errors")
assertEquals(2, metadataResponseV0.topicMetadata.size(), "V0 Response should have 2 (all) topics")
// v1, Null represents all topics
val metadataResponseV1 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
assertTrue(metadataResponseV1.errors.isEmpty, "V1 Response should have no errors")
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response should have 2 (all) topics")
}
@ParameterizedTest
@ -217,15 +241,25 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
!response.brokers.asScala.exists(_.id == downNode.dataPlaneRequestProcessor.brokerId)
}, "Replica was not found down", 50000)
// Validate version 4 returns unavailable replicas with no error
val v4MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(4))
val v4BrokerIds = v4MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue(v4MetadataResponse.errors.isEmpty, "Response should have no errors")
assertFalse(v4BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
assertEquals(1, v4MetadataResponse.topicMetadata.size, "Response should have one topic")
val v4PartitionMetadata = v4MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertEquals(Errors.NONE, v4PartitionMetadata.error, "PartitionMetadata should have no errors")
assertEquals(replicaCount, v4PartitionMetadata.replicaIds.size, s"Response should have $replicaCount replicas")
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(requestData(List(replicaDownTopic), allowAutoTopicCreation = true), 0.toShort))
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no errors")
assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should have one topic")
val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE, "PartitionMetadata should have an error")
assertTrue(v0PartitionMetadata.replicaIds.size == replicaCount - 1, s"Response should have ${replicaCount - 1} replicas")
// Validate version 1 returns unavailable replicas with no error
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no errors")
assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed broker should not be in the brokers list")
assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should have one topic")
val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata should have no errors")
assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response should have $replicaCount replicas")
}
@ParameterizedTest