KAFKA-15874: Add metric and request log attribute for deprecated request api versions (KIP-896) (#15032)

Breakdown of this PR:
* Extend the generator to support deprecated api versions
* Set deprecated api versions via the request json files
* Expose the information via metrics and the request log

The relevant section of the KIP:

> * Introduce metric `kafka.network:type=RequestMetrics,name=DeprecatedRequestsPerSec,request=(api-name),version=(api-version),clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version)`
> * Add boolean field `requestApiVersionDeprecated`  to the request
header section of the request log (alongside `requestApiKey` ,
`requestApiVersion`, `requestApiKeyName` , etc.).

Unit tests were added to verify the new generator functionality,
the new metric and the new request log attribute.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ismael Juma 2023-12-20 05:13:36 -08:00 committed by GitHub
parent 4e11de00a7
commit 919b585da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 172 additions and 13 deletions

View File

@ -234,6 +234,10 @@ public enum ApiKeys {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion(enableUnstableLastVersion);
}
public boolean isVersionDeprecated(short apiVersion) {
return apiVersion >= messageType.lowestDeprecatedVersion() && apiVersion <= messageType.highestDeprecatedVersion();
}
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion) {
short oldestVersion = oldestVersion();
short latestVersion = latestVersion(enableUnstableLastVersion);

View File

@ -21,6 +21,7 @@
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.
"validVersions": "0-2",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Dirs", "type": "[]AlterReplicaLogDir", "versions": "0+",

View File

@ -22,6 +22,7 @@
// Version 2 enables flexible versions.
// Version 3 adds user resource type.
"validVersions": "0-3",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Creations", "type": "[]AclCreation", "versions": "0+",

View File

@ -24,6 +24,7 @@
//
// Version 3 adds owner principal
"validVersions": "0-3",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "OwnerPrincipalType", "type": "string", "versions": "3+", "nullableVersions": "3+",

View File

@ -30,6 +30,7 @@
//
// Version 7 is the same as version 6.
"validVersions": "0-7",
"deprecatedVersions": "0-1",
"flexibleVersions": "5+",
"fields": [
{ "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",

View File

@ -22,6 +22,7 @@
// Version 2 enables flexible versions.
// Version 3 adds the user resource type.
"validVersions": "0-3",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Filters", "type": "[]DeleteAclsFilter", "versions": "0+",

View File

@ -22,6 +22,7 @@
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",

View File

@ -27,6 +27,7 @@
//
// Version 6 reorganizes topics, adds topic IDs and allows topic names to be null.
"validVersions": "0-6",
"deprecatedVersions": "0",
"flexibleVersions": "4+",
"fields": [
{ "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", "about": "The name or topic ID of the topic",

View File

@ -22,6 +22,7 @@
// Version 2 enables flexible versions.
// Version 3 adds user resource type.
"validVersions": "0-3",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "ResourceTypeFilter", "type": "int8", "versions": "0+",

View File

@ -22,6 +22,7 @@
// Version 2 is the same as version 1.
// Version 4 enables flexible versions.
"validVersions": "0-4",
"deprecatedVersions": "0",
"flexibleVersions": "4+",
"fields": [
{ "name": "Resources", "type": "[]DescribeConfigsResource", "versions": "0+",

View File

@ -22,6 +22,7 @@
// Version 2 adds flexible version support
// Version 3 adds token requester into the response
"validVersions": "0-3",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Owners", "type": "[]DescribeDelegationTokenOwner", "versions": "0+", "nullableVersions": "0+",

View File

@ -19,10 +19,11 @@
"listeners": ["zkBroker", "broker"],
"name": "DescribeLogDirsRequest",
// Version 1 is the same as version 0.
"validVersions": "0-4",
// Version 2 is the first flexible version.
// Version 3 is the same as version 2 (new field in response).
// Version 4 is the same as version 2 (new fields in response).
"validVersions": "0-4",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+",

View File

@ -21,6 +21,7 @@
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
"validVersions": "0-2",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Hmac", "type": "bytes", "versions": "0+",

View File

@ -56,6 +56,7 @@
//
// Version 16 is the same as version 15 (KIP-951).
"validVersions": "0-16",
"deprecatedVersions": "0-3",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",

View File

@ -26,6 +26,7 @@
//
// Version 4 adds support for batching via CoordinatorKeys (KIP-699)
"validVersions": "0-4",
"deprecatedVersions": "0",
"flexibleVersions": "3+",
"fields": [
{ "name": "Key", "type": "string", "versions": "0-3",

View File

@ -35,6 +35,7 @@
//
// Version 9 is the same as version 8.
"validVersions": "0-9",
"deprecatedVersions": "0-1",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

View File

@ -35,6 +35,7 @@
//
// Version 8 enables listing offsets by local log start offset (KIP-405).
"validVersions": "0-8",
"deprecatedVersions": "0",
"flexibleVersions": "6+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",

View File

@ -19,6 +19,7 @@
"listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
"validVersions": "0-12",
"deprecatedVersions": "0-3",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and

View File

@ -35,6 +35,7 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The
// request is the same as version 8.
"validVersions": "0-9",
"deprecatedVersions": "0-1",
"flexibleVersions": "8+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

View File

@ -37,6 +37,7 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
// the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
"validVersions": "0-9",
"deprecatedVersions": "0",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",

View File

@ -28,6 +28,7 @@
//
// Version 4 enables flexible versions.
"validVersions": "0-4",
"deprecatedVersions": "0-1",
"flexibleVersions": "4+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "3+", "default": -2, "ignorable": true, "entityType": "brokerId",

View File

@ -36,6 +36,7 @@
//
// Version 10 is the same as version 9 (KIP-951).
"validVersions": "0-10",
"deprecatedVersions": "0-6",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",

View File

@ -21,6 +21,7 @@
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
"validVersions": "0-2",
"deprecatedVersions": "0",
"flexibleVersions": "2+",
"fields": [
{ "name": "Hmac", "type": "bytes", "versions": "0+",

View File

@ -23,6 +23,7 @@
// client negotiation for clients <= 2.4.
// See https://issues.apache.org/jira/browse/KAFKA-9577
"validVersions": "0-1",
"deprecatedVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "Mechanism", "type": "string", "versions": "0+",

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.EnvelopeResponseData
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.network.{ClientInformation, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
@ -254,6 +254,7 @@ object RequestChannel extends Logging {
overrideMetricNames.foreach { metricName =>
val m = metrics(metricName)
m.requestRate(header.apiVersion).mark()
m.deprecatedRequestRate(header.apiKey, header.apiVersion, context.clientInformation).foreach(_.mark())
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
@ -521,6 +522,7 @@ object RequestMetrics {
val verifyPartitionsInTxnMetricName = ApiKeys.ADD_PARTITIONS_TO_TXN.name + "Verification"
val RequestsPerSec = "RequestsPerSec"
val DeprecatedRequestsPerSec = "DeprecatedRequestsPerSec"
val RequestQueueTimeMs = "RequestQueueTimeMs"
val LocalTimeMs = "LocalTimeMs"
val RemoteTimeMs = "RemoteTimeMs"
@ -534,6 +536,8 @@ object RequestMetrics {
val ErrorsPerSec = "ErrorsPerSec"
}
private case class DeprecatedRequestRateKey(version: Short, clientInformation: ClientInformation)
class RequestMetrics(name: String) {
import RequestMetrics._
@ -542,6 +546,7 @@ class RequestMetrics(name: String) {
val tags = Map("request" -> name).asJava
val requestRateInternal = new Pool[Short, Meter]()
private val deprecatedRequestRateInternal = new Pool[DeprecatedRequestRateKey, Meter]()
// time a request spent in a request queue
val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, true, tags)
// time a request takes to be processed at the local broker
@ -578,13 +583,28 @@ class RequestMetrics(name: String) {
def requestRate(version: Short): Meter =
requestRateInternal.getAndMaybePut(version, metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tagsWithVersion(version)))
def deprecatedRequestRate(apiKey: ApiKeys, version: Short, clientInformation: ClientInformation): Option[Meter] =
if (apiKey.isVersionDeprecated(version)) {
Some(deprecatedRequestRateInternal.getAndMaybePut(DeprecatedRequestRateKey(version, clientInformation),
metricsGroup.newMeter(DeprecatedRequestsPerSec, "requests", TimeUnit.SECONDS, tagsWithVersionAndClientInfo(version, clientInformation))))
} else None
private def tagsWithVersion(version: Short): java.util.Map[String, String] = {
val nameAndVersionTags = new util.HashMap[String, String](tags.size() + 1)
val nameAndVersionTags = new util.LinkedHashMap[String, String](math.ceil((tags.size() + 1) / 0.75).toInt) // take load factor into account
nameAndVersionTags.putAll(tags)
nameAndVersionTags.put("version", version.toString)
nameAndVersionTags
}
private def tagsWithVersionAndClientInfo(version: Short, clientInformation: ClientInformation): java.util.Map[String, String] = {
val extendedTags = new util.LinkedHashMap[String, String](math.ceil((tags.size() + 3) / 0.75).toInt) // take load factor into account
extendedTags.putAll(tags)
extendedTags.put("version", version.toString)
extendedTags.put("clientSoftwareName", clientInformation.softwareName)
extendedTags.put("clientSoftwareVersion", clientInformation.softwareVersion)
extendedTags
}
class ErrorMeter(name: String, error: Errors) {
private val tags = Map("request" -> name, "error" -> error.name).asJava
@ -617,9 +637,10 @@ class RequestMetrics(name: String) {
}
def removeMetrics(): Unit = {
for (version <- requestRateInternal.keys) {
for (version <- requestRateInternal.keys)
metricsGroup.removeMetric(RequestsPerSec, tagsWithVersion(version))
}
for (key <- deprecatedRequestRateInternal.keys)
metricsGroup.removeMetric(DeprecatedRequestsPerSec, tagsWithVersionAndClientInfo(key.version, key.clientInformation))
metricsGroup.removeMetric(RequestQueueTimeMs, tags)
metricsGroup.removeMetric(LocalTimeMs, tags)
metricsGroup.removeMetric(RemoteTimeMs, tags)

View File

@ -192,6 +192,8 @@ object RequestConvertToJson {
def requestHeaderNode(header: RequestHeader): JsonNode = {
val node = RequestHeaderDataJsonConverter.write(header.data, header.headerVersion, false).asInstanceOf[ObjectNode]
node.set("requestApiKeyName", new TextNode(header.apiKey.toString))
if (header.apiKey().isVersionDeprecated(header.apiVersion()))
node.set("requestApiVersionDeprecated", BooleanNode.TRUE)
node
}

View File

@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import java.util.Collections
import scala.collection.mutable.ArrayBuffer
class RequestConvertToJsonTest {
@ -118,6 +119,21 @@ class RequestConvertToJsonTest {
assertEquals(expectedNode, actualNode)
}
@Test
def testRequestHeaderNodeWithDeprecatedApiVersion(): Unit = {
val fetchRequest = FetchRequest.Builder.forConsumer(0, 0, 0, Collections.emptyMap()).build(0);
val req = request(fetchRequest)
val header = req.header
val expectedNode = RequestHeaderDataJsonConverter.write(header.data, header.headerVersion, false).asInstanceOf[ObjectNode]
expectedNode.set("requestApiKeyName", new TextNode(header.apiKey.toString))
expectedNode.set("requestApiVersionDeprecated", BooleanNode.TRUE)
val actualNode = RequestConvertToJson.requestHeaderNode(header)
assertEquals(expectedNode, actualNode)
}
@Test
def testClientInfoNode(): Unit = {
val clientInfo = new ClientInformation("name", "1")

View File

@ -107,6 +107,7 @@ class SocketServerTest {
sockets.foreach(_.close())
sockets.clear()
kafkaLogger.setLevel(logLevelToRestore)
TestUtils.clearYammerMetrics()
}
def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true): Unit = {
@ -223,7 +224,7 @@ class SocketServerTest {
server.metrics.close()
}
private def producerRequestBytes(ack: Short = 0): Array[Byte] = {
private def producerRequestBytes(apiVersion: Short = ApiKeys.PRODUCE.latestVersion, ack: Short = 0): Array[Byte] = {
val correlationId = -1
val clientId = ""
val ackTimeoutMs = 10000
@ -233,7 +234,7 @@ class SocketServerTest {
.setAcks(ack)
.setTimeoutMs(ackTimeoutMs)
.setTransactionalId(null))
.build()
.build(apiVersion)
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
Utils.toArray(emptyRequest.serializeWithHeader(emptyHeader))
}
@ -306,6 +307,52 @@ class SocketServerTest {
)
}
@Test
def testRequestPerSecAndDeprecatedRequestsPerSecMetrics(): Unit = {
val clientName = "apache-kafka-java"
val clientVersion = AppInfoParser.getVersion
def deprecatedRequestsPerSec(requestVersion: Short): Option[Long] =
TestUtils.meterCountOpt(s"${RequestMetrics.DeprecatedRequestsPerSec},request=Produce,version=$requestVersion," +
s"clientSoftwareName=$clientName,clientSoftwareVersion=$clientVersion")
def requestsPerSec(requestVersion: Short): Option[Long] =
TestUtils.meterCountOpt(s"${RequestMetrics.RequestsPerSec},request=Produce,version=$requestVersion")
val plainSocket = connect()
val address = plainSocket.getLocalAddress
val clientId = "clientId"
sendRequest(plainSocket, apiVersionRequestBytes(clientId, ApiKeys.API_VERSIONS.latestVersion))
var receivedReq = receiveRequest(server.dataPlaneRequestChannel)
server.dataPlaneRequestChannel.sendNoOpResponse(receivedReq)
var requestVersion = ApiKeys.PRODUCE.latestVersion
sendRequest(plainSocket, producerRequestBytes(requestVersion))
receivedReq = receiveRequest(server.dataPlaneRequestChannel)
assertEquals(clientName, receivedReq.context.clientInformation.softwareName)
assertEquals(clientVersion, receivedReq.context.clientInformation.softwareVersion)
server.dataPlaneRequestChannel.sendNoOpResponse(receivedReq)
TestUtils.waitUntilTrue(() => requestsPerSec(requestVersion).isDefined, "RequestsPerSec metric could not be found")
assertTrue(requestsPerSec(requestVersion).getOrElse(0L) > 0, "RequestsPerSec should be higher than 0")
assertEquals(None, deprecatedRequestsPerSec(requestVersion))
requestVersion = 3
sendRequest(plainSocket, producerRequestBytes(requestVersion))
receivedReq = receiveRequest(server.dataPlaneRequestChannel)
server.dataPlaneRequestChannel.sendNoOpResponse(receivedReq)
TestUtils.waitUntilTrue(() => deprecatedRequestsPerSec(requestVersion).isDefined, "DeprecatedRequestsPerSec metric could not be found")
assertTrue(deprecatedRequestsPerSec(requestVersion).getOrElse(0L) > 0, "DeprecatedRequestsPerSec should be higher than 0")
plainSocket.setSoLinger(true, 0)
plainSocket.close()
TestUtils.waitUntilTrue(() => server.connectionCount(address) == 0, msg = "Connection not closed")
}
@Test
def testStagedListenerStartup(): Unit = {
shutdownServerAndMetrics(server)

View File

@ -2188,13 +2188,15 @@ object TestUtils extends Logging {
}
def meterCount(metricName: String): Long = {
meterCountOpt(metricName).getOrElse(fail(s"Unable to find metric $metricName"))
}
def meterCountOpt(metricName: String): Option[Long] = {
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
.values
.headOption
.getOrElse(fail(s"Unable to find metric $metricName"))
.asInstanceOf[Meter]
.count
.map(_.asInstanceOf[Meter].count)
}
def metersCount(metricName: String): Long = {

View File

@ -156,6 +156,10 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("%n");
generateHighestSupportedVersion();
buffer.printf("%n");
generateAccessor("lowestDeprecatedVersion", "short");
buffer.printf("%n");
generateAccessor("highestDeprecatedVersion", "short");
buffer.printf("%n");
generateAccessor("listeners", "EnumSet<ListenerType>");
buffer.printf("%n");
generateAccessor("latestVersionUnstable", "boolean");
@ -212,7 +216,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
.collect(Collectors.toList());
}
buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s, %s)%s%n",
buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, (short) %d, (short) %d, %s, %s)%s%n",
MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT),
MessageGenerator.capitalizeFirst(name),
entry.getKey(),
@ -220,6 +224,8 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
apiData.responseSchema(),
apiData.requestSpec.struct().versions().lowest(),
apiData.requestSpec.struct().versions().highest(),
apiData.requestSpec.struct().deprecatedVersions().lowest(),
apiData.requestSpec.struct().deprecatedVersions().highest(),
generateListenerTypeEnumSet(listeners),
apiData.requestSpec.latestVersionUnstable(),
(numProcessed == apis.size()) ? ";" : ",");
@ -233,6 +239,8 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("private final Schema[] responseSchemas;%n");
buffer.printf("private final short lowestSupportedVersion;%n");
buffer.printf("private final short highestSupportedVersion;%n");
buffer.printf("private final short lowestDeprecatedVersion;%n");
buffer.printf("private final short highestDeprecatedVersion;%n");
buffer.printf("private final EnumSet<ListenerType> listeners;%n");
buffer.printf("private final boolean latestVersionUnstable;%n");
headerGenerator.addImport(MessageGenerator.SCHEMA_CLASS);
@ -243,6 +251,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("ApiMessageType(String name, short apiKey, " +
"Schema[] requestSchemas, Schema[] responseSchemas, " +
"short lowestSupportedVersion, short highestSupportedVersion, " +
"short lowestDeprecatedVersion, short highestDeprecatedVersion, " +
"EnumSet<ListenerType> listeners, boolean latestVersionUnstable) {%n");
buffer.incrementIndent();
buffer.printf("this.name = name;%n");
@ -251,6 +260,8 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
buffer.printf("this.responseSchemas = responseSchemas;%n");
buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n");
buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n");
buffer.printf("this.lowestDeprecatedVersion = lowestDeprecatedVersion;%n");
buffer.printf("this.highestDeprecatedVersion = highestDeprecatedVersion;%n");
buffer.printf("this.listeners = listeners;%n");
buffer.printf("this.latestVersionUnstable = latestVersionUnstable;%n");
buffer.decrementIndent();

View File

@ -44,6 +44,7 @@ public final class MessageSpec {
@JsonCreator
public MessageSpec(@JsonProperty("name") String name,
@JsonProperty("validVersions") String validVersions,
@JsonProperty("deprecatedVersions") String deprecatedVersions,
@JsonProperty("fields") List<FieldSpec> fields,
@JsonProperty("apiKey") Short apiKey,
@JsonProperty("type") MessageSpecType type,
@ -52,7 +53,7 @@ public final class MessageSpec {
@JsonProperty("listeners") List<RequestListenerType> listeners,
@JsonProperty("latestVersionUnstable") boolean latestVersionUnstable
) {
this.struct = new StructSpec(name, validVersions, fields);
this.struct = new StructSpec(name, validVersions, deprecatedVersions, fields);
this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
this.type = Objects.requireNonNull(type);
this.commonStructs = commonStructs == null ? Collections.emptyList() :

View File

@ -110,6 +110,7 @@ final class StructRegistry {
// Synthesize a StructSpec object out of the fields.
StructSpec spec = new StructSpec(typeName,
field.versions().toString(),
Versions.NONE_STRING, // version deprecations not supported at field level
field.fields());
structs.put(typeName, new StructInfo(spec, parentVersions));
}

View File

@ -31,6 +31,8 @@ public final class StructSpec {
private final Versions versions;
private final Versions deprecatedVersions;
private final List<FieldSpec> fields;
private final boolean hasKeys;
@ -38,6 +40,7 @@ public final class StructSpec {
@JsonCreator
public StructSpec(@JsonProperty("name") String name,
@JsonProperty("versions") String versions,
@JsonProperty("deprecatedVersions") String deprecatedVersions,
@JsonProperty("fields") List<FieldSpec> fields) {
this.name = Objects.requireNonNull(name);
this.versions = Versions.parse(versions, null);
@ -45,6 +48,7 @@ public final class StructSpec {
throw new RuntimeException("You must specify the version of the " +
name + " structure.");
}
this.deprecatedVersions = Versions.parse(deprecatedVersions, Versions.NONE);
ArrayList<FieldSpec> newFields = new ArrayList<>();
if (fields != null) {
// Each field should have a unique tag ID (if the field has a tag ID).
@ -88,6 +92,10 @@ public final class StructSpec {
return versions.toString();
}
public Versions deprecatedVersions() {
return deprecatedVersions;
}
@JsonProperty
public List<FieldSpec> fields() {
return fields;

View File

@ -50,6 +50,28 @@ public class MessageDataGeneratorTest {
new MessageDataGenerator("org.apache.kafka.common.message").generate(testMessageSpec);
}
@Test
public void testNullDefaultsWithDeprecatedVersions() throws Exception {
MessageSpec testMessageSpec = MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"FooBar\",",
" \"validVersions\": \"0-4\",",
" \"deprecatedVersions\": \"0-1\",",
" \"flexibleVersions\": \"none\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
" { \"name\": \"field2\", \"type\": \"[]TestStruct\", \"versions\": \"1+\", ",
" \"nullableVersions\": \"1+\", \"default\": \"null\", \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" }",
" ]},",
" { \"name\": \"field3\", \"type\": \"bytes\", \"versions\": \"2+\", ",
" \"nullableVersions\": \"2+\", \"default\": \"null\" }",
" ]",
"}")), MessageSpec.class);
new MessageDataGenerator("org.apache.kafka.common.message").generate(testMessageSpec);
}
private void assertStringContains(String substring, String value) {
assertTrue(value.contains(substring),
"Expected string to contain '" + substring + "', but it was " + value);

View File

@ -37,7 +37,8 @@ public class StructRegistryTest {
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0-2\",",
" \"validVersions\": \"0-4\",",
" \"deprecatedVersions\": \"0-1\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",