KAFKA-16469: Metadata schema checker (#15995)

Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-10-03 12:13:38 -07:00 committed by GitHub
parent 85bfdf4127
commit dbd50ff847
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1717 additions and 14 deletions

View File

@ -365,6 +365,7 @@
<allow pkg="com.fasterxml.jackson" /> <allow pkg="com.fasterxml.jackson" />
<allow pkg="com.fasterxml.jackson.annotation" /> <allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="net.sourceforge.argparse4j" /> <allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.message" />
<allow pkg="org.apache.message" /> <allow pkg="org.apache.message" />
</subpackage> </subpackage>

View File

@ -25,9 +25,9 @@
<!-- Generator --> <!-- Generator -->
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity" <suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldType).java"/> files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldSpecPairIterator|FieldType).java"/>
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/> files="(FieldSpecPairIterator|MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/> files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"

View File

@ -107,6 +107,13 @@ public final class FieldSpec {
if (!this.type.isArray() && !this.type.isStruct()) { if (!this.type.isArray() && !this.type.isStruct()) {
throw new RuntimeException("Non-array or Struct field " + name + " cannot have fields"); throw new RuntimeException("Non-array or Struct field " + name + " cannot have fields");
} }
// Check struct invariants
if (this.type.isStruct() || this.type.isStructArray()) {
new StructSpec(name,
versions,
Versions.NONE_STRING, // version deprecations not supported at field level
fields);
}
} }
if (flexibleVersions == null || flexibleVersions.isEmpty()) { if (flexibleVersions == null || flexibleVersions.isEmpty()) {

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
@ -165,7 +166,7 @@ public final class MessageGenerator {
/** /**
* The Jackson serializer we use for JSON objects. * The Jackson serializer we use for JSON objects.
*/ */
static final ObjectMapper JSON_SERDE; public static final ObjectMapper JSON_SERDE;
static { static {
JSON_SERDE = new ObjectMapper(); JSON_SERDE = new ObjectMapper();
@ -174,6 +175,7 @@ public final class MessageGenerator {
JSON_SERDE.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true); JSON_SERDE.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true);
JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true); JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
JSON_SERDE.registerModule(new Jdk8Module());
} }
private static List<TypeClassGenerator> createTypeClassGenerators(String packageName, private static List<TypeClassGenerator> createTypeClassGenerators(String packageName,
@ -272,7 +274,7 @@ public final class MessageGenerator {
System.out.printf("MessageGenerator: processed %d Kafka message JSON files(s).%n", numProcessed); System.out.printf("MessageGenerator: processed %d Kafka message JSON files(s).%n", numProcessed);
} }
static String capitalizeFirst(String string) { public static String capitalizeFirst(String string) {
if (string.isEmpty()) { if (string.isEmpty()) {
return string; return string;
} }

View File

@ -27,7 +27,7 @@ import java.util.TreeSet;
/** /**
* Contains structure data for Kafka MessageData classes. * Contains structure data for Kafka MessageData classes.
*/ */
final class StructRegistry { public final class StructRegistry {
private final Map<String, StructInfo> structs; private final Map<String, StructInfo> structs;
private final Set<String> commonStructNames; private final Set<String> commonStructNames;
@ -58,7 +58,7 @@ final class StructRegistry {
} }
} }
StructRegistry() { public StructRegistry() {
this.structs = new TreeMap<>(); this.structs = new TreeMap<>();
this.commonStructNames = new TreeSet<>(); this.commonStructNames = new TreeSet<>();
} }
@ -66,7 +66,7 @@ final class StructRegistry {
/** /**
* Register all the structures contained a message spec. * Register all the structures contained a message spec.
*/ */
void register(MessageSpec message) { public void register(MessageSpec message) throws Exception {
// Register common structures. // Register common structures.
for (StructSpec struct : message.commonStructs()) { for (StructSpec struct : message.commonStructs()) {
if (!MessageGenerator.firstIsCapitalized(struct.name())) { if (!MessageGenerator.firstIsCapitalized(struct.name())) {
@ -122,7 +122,7 @@ final class StructRegistry {
/** /**
* Locate the struct corresponding to a field. * Locate the struct corresponding to a field.
*/ */
StructSpec findStruct(FieldSpec field) { public StructSpec findStruct(FieldSpec field) {
String structFieldName; String structFieldName;
if (field.type().isArray()) { if (field.type().isArray()) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type(); FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
@ -134,6 +134,10 @@ final class StructRegistry {
throw new RuntimeException("Field " + field.name() + throw new RuntimeException("Field " + field.name() +
" cannot be treated as a structure."); " cannot be treated as a structure.");
} }
return findStruct(structFieldName);
}
public StructSpec findStruct(String structFieldName) {
StructInfo structInfo = structs.get(structFieldName); StructInfo structInfo = structs.get(structFieldName);
if (structInfo == null) { if (structInfo == null) {
throw new RuntimeException("Unable to locate a specification for the structure " + throw new RuntimeException("Unable to locate a specification for the structure " +
@ -145,7 +149,7 @@ final class StructRegistry {
/** /**
* Return true if the field is a struct array with keys. * Return true if the field is a struct array with keys.
*/ */
boolean isStructArrayWithKeys(FieldSpec field) { public boolean isStructArrayWithKeys(FieldSpec field) {
if (!field.type().isArray()) { if (!field.type().isArray()) {
return false; return false;
} }

View File

@ -53,14 +53,20 @@ public final class StructSpec {
if (fields != null) { if (fields != null) {
// Each field should have a unique tag ID (if the field has a tag ID). // Each field should have a unique tag ID (if the field has a tag ID).
HashSet<Integer> tags = new HashSet<>(); HashSet<Integer> tags = new HashSet<>();
// Each field should have a unique name.
HashSet<String> names = new HashSet<>();
for (FieldSpec field : fields) { for (FieldSpec field : fields) {
if (field.tag().isPresent()) { field.tag().ifPresent(tag -> {
if (tags.contains(field.tag().get())) { if (!tags.add(tag)) {
throw new RuntimeException("In " + name + ", field " + field.name() + throw new RuntimeException("In " + name + ", field " + field.name() +
" has a duplicate tag ID " + field.tag().get() + ". All tags IDs " + " has a duplicate tag ID " + tag + ". All tags IDs " +
"must be unique."); "must be unique.");
} }
tags.add(field.tag().get()); });
if (!names.add(field.name())) {
throw new RuntimeException("In " + name + ", field " + field.name() +
" has a duplicate name " + field.name() + ". All field names " +
"must be unique.");
} }
newFields.add(field); newFields.add(field);
} }

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.MessageGenerator;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.Versions;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Utilities for the metadata schema checker.
*/
class CheckerUtils {
/**
* A min function defined for shorts.
*
* @param a The first short integer to compare.
* @param b The second short integer to compare.
* @return The minimum short integer.
*/
static short min(short a, short b) {
return a < b ? a : b;
}
/**
* A max function defined for shorts.
*
* @param a The first short integer to compare.
* @param b The second short integer to compare.
* @return The maximum short integer.
*/
static short max(short a, short b) {
return a > b ? a : b;
}
/**
* Validate the a field doesn't have tagged versions that are outside of the top-level flexible
* versions.
*
* @param what A description of the field.
* @param field The field to validate.
* @param topLevelFlexibleVersions The top-level flexible versions.
*/
static void validateTaggedVersions(
String what,
FieldSpec field,
Versions topLevelFlexibleVersions
) {
if (!field.flexibleVersions().isPresent()) {
if (!topLevelFlexibleVersions.contains(field.taggedVersions())) {
throw new RuntimeException("Tagged versions for " + what + " " +
field.name() + " are " + field.taggedVersions() + ", but top " +
"level flexible versions are " + topLevelFlexibleVersions);
}
}
}
/**
* Read a MessageSpec file from a path.
*
* @param schemaPath The path to read the file from.
* @return The MessageSpec.
*/
static MessageSpec readMessageSpecFromFile(String schemaPath) {
if (!Files.isReadable(Paths.get(schemaPath))) {
throw new RuntimeException("Path " + schemaPath + " does not point to " +
"a readable file.");
}
try {
return MessageGenerator.JSON_SERDE.readValue(new File(schemaPath), MessageSpec.class);
} catch (Exception e) {
throw new RuntimeException("Unable to parse file as MessageSpec: " + schemaPath, e);
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
/**
* An exception thrown when a schema is evolved an invalid way.
*/
public class EvolutionException extends RuntimeException {
public EvolutionException(String message, Throwable t) {
super(message, t);
}
public EvolutionException(String message) {
super(message, null);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.StructSpec;
public class EvolutionVerifier {
private final MessageSpec topLevelMessage1;
private final MessageSpec topLevelMessage2;
public EvolutionVerifier(
MessageSpec topLevelMessage1,
MessageSpec topLevelMessage2
) {
this.topLevelMessage1 = topLevelMessage1;
this.topLevelMessage2 = topLevelMessage2;
}
public void verify() throws Exception {
verifyTopLevelMessages(topLevelMessage1, topLevelMessage2);
verifyVersionsMatchTopLevelMessage("message1", topLevelMessage1);
verifyVersionsMatchTopLevelMessage("message2", topLevelMessage2);
Unifier unifier = new Unifier(topLevelMessage1, topLevelMessage2);
unifier.unify();
}
static void verifyTopLevelMessages(MessageSpec topLevelMessage1, MessageSpec topLevelMessage2) {
if (!topLevelMessage1.apiKey().equals(topLevelMessage2.apiKey())) {
throw new EvolutionException("Initial apiKey " + topLevelMessage1.apiKey() +
" does not match final apiKey " + topLevelMessage2.apiKey());
}
if (!topLevelMessage1.type().equals(topLevelMessage2.type())) {
throw new EvolutionException("Initial type " + topLevelMessage1.type() +
" does not match final type " + topLevelMessage2.type());
}
if (!topLevelMessage2.flexibleVersions().contains(topLevelMessage1.flexibleVersions())) {
throw new EvolutionException("Initial flexibleVersions " + topLevelMessage1.flexibleVersions() +
" must be a subset of final flexibleVersions " + topLevelMessage2.flexibleVersions());
}
if (topLevelMessage2.validVersions().highest() < topLevelMessage1.validVersions().highest()) {
throw new EvolutionException("Initial maximum valid version " +
topLevelMessage1.validVersions().highest() + " must not be higher than final " +
"maximum valid version " + topLevelMessage2.validVersions().highest());
}
if (topLevelMessage2.validVersions().lowest() < topLevelMessage1.validVersions().lowest()) {
throw new EvolutionException("Initial minimum valid version " +
topLevelMessage1.validVersions().lowest() + " must not be higher than final " +
"minimum valid version " + topLevelMessage2.validVersions().lowest());
}
}
static void verifyVersionsMatchTopLevelMessage(
String what,
MessageSpec topLevelMessage
) {
for (FieldSpec field : topLevelMessage.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
}
for (StructSpec struct : topLevelMessage.commonStructs()) {
for (FieldSpec field : topLevelMessage.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
}
}
}
static void verifyVersionsMatchTopLevelMessage(
String what,
MessageSpec topLevelMessage,
FieldSpec field
) {
if (topLevelMessage.validVersions().intersect(field.versions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what + " has versions " +
field.versions() + ", but the message versions are only " +
topLevelMessage.validVersions() + ".");
}
if (!field.nullableVersions().empty()) {
if (topLevelMessage.validVersions().intersect(field.nullableVersions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has nullableVersions " + field.nullableVersions() + ", but the message " +
"versions are only " + topLevelMessage.validVersions() + ".");
}
}
if (field.tag().isPresent()) {
if (topLevelMessage.validVersions().intersect(field.taggedVersions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has taggedVersions " + field.taggedVersions() + ", but the message " +
"versions are only " + topLevelMessage.validVersions() + ".");
}
}
field.flexibleVersions().ifPresent(v -> {
if (topLevelMessage.validVersions().intersect(v).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has flexibleVersions " + v + ", but the message versions are only " +
topLevelMessage.validVersions() + ".");
}
});
for (FieldSpec child : field.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, child);
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.Versions;
/**
* FieldDomain represents whether a field should appear in message1, message2, both, or neither.
*
* Note that this class does not handle tagged fields. Tagged fields can appear in any version,
* provided that the version is a flexibleVersion. In other words, adding a tagged field to an
* existing version is not an incompatible change. (However, reusing a tag index certainly is.)
*/
enum FieldDomain {
MESSAGE1_ONLY,
BOTH,
MESSAGE2_ONLY,
NEITHER;
static FieldDomain of(
FieldSpec fieldSpec,
Versions versions1,
Versions versions2
) {
Versions intersection1 = versions1.intersect(fieldSpec.versions());
Versions intersection2 = versions2.intersect(fieldSpec.versions());
if (intersection1.empty()) {
if (intersection2.empty()) {
return NEITHER;
} else {
return MESSAGE2_ONLY;
}
} else if (intersection2.empty()) {
return MESSAGE1_ONLY;
} else {
return BOTH;
}
}
void validate(
String what,
FieldSpec field,
boolean present1,
boolean present2
) {
switch (this) {
case MESSAGE1_ONLY:
if (present2) {
throw new UnificationException(what + " " + field.name() + " is present in " +
"message2, but should not be, based on its versions.");
}
if (!present1) {
throw new UnificationException(what + " " + field.name() + " is not present in " +
"message1, but should be, based on its versions.");
}
break;
case BOTH:
if (!present1) {
throw new UnificationException(what + " " + field.name() + " is not present in " +
"message1, but should be, based on its versions.");
}
if (!present2) {
throw new UnificationException(what + " " + field.name() + " is not present in " +
"message2, but should be, based on its versions.");
}
break;
case MESSAGE2_ONLY:
if (present1) {
throw new UnificationException(what + " " + field.name() + " is present in " +
"message1, but should not be, based on its versions.");
}
if (!present2) {
throw new UnificationException(what + " " + field.name() + " is not present in " +
"message2, but should be, based on its versions.");
}
break;
case NEITHER:
if (present1) {
throw new UnificationException(what + " " + field.name() + " is present in " +
"message1, but should not be, based on its versions.");
}
if (present2) {
throw new UnificationException(what + " " + field.name() + " is present in " +
"message2, but should not be, based on its versions.");
}
break;
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
class FieldSpecPair {
private final FieldSpec field1;
private final FieldSpec field2;
FieldSpecPair(
FieldSpec field1,
FieldSpec field2
) {
this.field1 = field1;
this.field2 = field2;
}
FieldSpec field1() {
return field1;
}
FieldSpec field2() {
return field2;
}
@Override
public String toString() {
return "FieldSpecPair(field1=" + field1.name() +
", field2=" + field2.name() +
")";
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.Versions;
import java.util.Iterator;
import java.util.NoSuchElementException;
class FieldSpecPairIterator implements Iterator<FieldSpecPair> {
private final Iterator<FieldSpec> iterator1;
private final Iterator<FieldSpec> iterator2;
private final Versions topLevelVersions1;
private final Versions topLevelVersions2;
private FieldSpecPair next;
FieldSpecPairIterator(
Iterator<FieldSpec> iterator1,
Iterator<FieldSpec> iterator2,
Versions topLevelVersions1,
Versions topLevelVersions2
) {
this.iterator1 = iterator1;
this.iterator2 = iterator2;
this.topLevelVersions1 = topLevelVersions1;
this.topLevelVersions2 = topLevelVersions2;
this.next = null;
}
@Override
public boolean hasNext() {
if (next != null) {
return true;
}
FieldSpec field1 = iterator1.hasNext() ? iterator1.next() : null;
while (field1 != null) {
FieldDomain domain1 = FieldDomain.of(field1, topLevelVersions1, topLevelVersions2);
switch (domain1) {
case MESSAGE1_ONLY:
field1 = iterator1.hasNext() ? iterator1.next() : null;
break;
case BOTH: {
FieldSpec field2 = iterator2.hasNext() ? iterator2.next() : null;
while (field2 != null) {
FieldDomain domain2 = FieldDomain.of(field2, topLevelVersions1, topLevelVersions2);
switch (domain2) {
case MESSAGE2_ONLY:
field2 = iterator2.hasNext() ? iterator2.next() : null;
break;
case BOTH:
next = new FieldSpecPair(field1, field2);
return true;
case MESSAGE1_ONLY:
case NEITHER:
throw new UnificationException("field2 " + field2.name() + " is present in " +
"message2, but should not be, based on its versions.");
}
}
break;
}
case MESSAGE2_ONLY:
case NEITHER:
throw new UnificationException("field1 " + field1.name() + " is present in " +
"message1, but should not be, based on its versions.");
}
}
FieldSpec field2 = iterator2.hasNext() ? iterator2.next() : null;
while (field2 != null) {
FieldDomain domain2 = FieldDomain.of(field2, topLevelVersions1, topLevelVersions2);
switch (domain2) {
case MESSAGE1_ONLY:
case NEITHER:
throw new UnificationException("field2 " + field2.name() + " is present in " +
"message2, but should not be, based on its versions.");
case BOTH:
throw new UnificationException("field2 " + field2.name() + " should be present " +
"in message1, but is not, based on its versions.");
case MESSAGE2_ONLY:
field2 = iterator2.hasNext() ? iterator2.next() : null;
break;
}
}
return false;
}
@Override
public FieldSpecPair next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
FieldSpecPair result = next;
next = null;
return result;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import java.io.PrintStream;
public class MetadataSchemaCheckerTool {
public static void main(String[] args) throws Exception {
try {
run(args, System.out);
} catch (HelpScreenException e) {
}
}
public static void run(
String[] args,
PrintStream writer
) throws Exception {
ArgumentParser argumentParser = ArgumentParsers.
newArgumentParser("metadata-schema-checker").
defaultHelp(true).
description("The Kafka metadata schema checker tool.");
Subparsers subparsers = argumentParser.addSubparsers().dest("command");
Subparser parseParser = subparsers.addParser("parse").
help("Verify that a JSON file can be parsed as a MessageSpec.");
parseParser.addArgument("--path", "-p").
required(true).
help("The path to a schema JSON file.");
Subparser evolutionVerifierParser = subparsers.addParser("verify-evolution").
help("Verify that an evolution of a JSON file is valid.");
evolutionVerifierParser.addArgument("--path1", "-1").
required(true).
help("The initial schema JSON path.");
evolutionVerifierParser.addArgument("--path2", "-2").
required(true).
help("The final schema JSON path.");
Namespace namespace;
if (args.length == 0) {
namespace = argumentParser.parseArgs(new String[] {"--help"});
} else {
namespace = argumentParser.parseArgs(args);
}
String command = namespace.getString("command");
switch (command) {
case "parse": {
String path = namespace.getString("path");
CheckerUtils.readMessageSpecFromFile(path);
writer.println("Successfully parsed file as MessageSpec: " + path);
break;
}
case "verify-evolution": {
String path1 = namespace.getString("path1");
String path2 = namespace.getString("path2");
EvolutionVerifier verifier = new EvolutionVerifier(
CheckerUtils.readMessageSpecFromFile(path1),
CheckerUtils.readMessageSpecFromFile(path2));
verifier.verify();
writer.println("Successfully verified evolution of path1: " + path1 +
", and path2: " + path2);
break;
}
default:
throw new RuntimeException("Unknown command " + command);
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
public class UnificationException extends RuntimeException {
private static final long serialVersionUID = 1L;
public UnificationException(String message, Throwable cause) {
super(message, cause);
}
public UnificationException(String message) {
this(message, null);
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.EntityType;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.FieldType;
import org.apache.kafka.message.MessageGenerator;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.StructRegistry;
import org.apache.kafka.message.StructSpec;
import org.apache.kafka.message.Versions;
import java.util.List;
import static org.apache.kafka.message.checker.CheckerUtils.max;
import static org.apache.kafka.message.checker.CheckerUtils.min;
/**
* The unifier attempts to unify the types of two KRPC messages. In other words, to see them as
* two different points in the evolution of a single message
* specification.
*/
class Unifier {
private final MessageSpec topLevelMessage1;
private final StructRegistry structRegistry1;
private final MessageSpec topLevelMessage2;
private final StructRegistry structRegistry2;
Unifier(
MessageSpec topLevelMessage1,
MessageSpec topLevelMessage2
) throws Exception {
this.topLevelMessage1 = topLevelMessage1;
this.structRegistry1 = new StructRegistry();
this.structRegistry1.register(topLevelMessage1);
this.topLevelMessage2 = topLevelMessage2;
this.structRegistry2 = new StructRegistry();
this.structRegistry1.register(topLevelMessage2);
}
static FieldSpec structSpecToFieldSpec(StructSpec structSpec) {
return new FieldSpec(structSpec.name(),
structSpec.versions().toString(),
structSpec.fields(),
MessageGenerator.capitalizeFirst(structSpec.name()),
false,
"",
"",
false,
EntityType.UNKNOWN,
"Top level StructSpec",
"",
null,
null,
false);
}
void unify() {
unify(structSpecToFieldSpec(topLevelMessage1.struct()),
structSpecToFieldSpec(topLevelMessage2.struct()));
}
void unify(FieldSpec field1, FieldSpec field2) {
// If the types don't match, then these fields cannot be unified. Of course, even if the
// types do match, we might be looking at two different structs, or something like that.
if (!field2.type().toString().equals(field1.type().toString())) {
throw new UnificationException("Field type for field2 " + field2.name() + " is " +
field2.type() + ", but field type for field1 " + field1.name() + " is " +
field1.type());
}
// The maximum supported version in field2 must be not be lower than the maximum supported
// version in field1.
short f1Highest = min(field1.versions().highest(), topLevelMessage1.validVersions().highest());
short f2Highest = min(field2.versions().highest(), topLevelMessage2.validVersions().highest());
if (f2Highest < f1Highest) {
throw new UnificationException("Maximum effective valid version for field2 " +
field2.name() + ", '" + f2Highest + "' cannot be lower than the " +
"maximum effective valid version for field1 " + field1.name() + ", '" + f1Highest + "'");
}
// The minimum supported version in field2 must not be different from the minimum supported
// version in field1.
short f1Lowest = max(field1.versions().lowest(), topLevelMessage2.validVersions().lowest());
short f2Lowest = max(field2.versions().lowest(), topLevelMessage2.validVersions().lowest());
if (f2Lowest != f1Lowest) {
throw new UnificationException("Minimum effective valid version for field2 " +
field2.name() + ", '" + f2Lowest + "' cannot be different than the " +
"minimum effective valid version for field1 " + field1.name() + ", '" +
f1Lowest + "'");
}
// The maximum nullable version in field2 must not be lower than the maximum nullable
// version in field1.
short f1HighestNull = min(f1Highest, field2.nullableVersions().highest());
short f2HighestNull = min(f2Highest, field1.nullableVersions().highest());
if (f2HighestNull < f1HighestNull) {
throw new UnificationException("Maximum effective nullable version for field2 " +
field2.name() + ", '" + f2HighestNull + "' cannot be lower than the " +
"minimum effective nullable version for field1 " + field1.name() + ", '" +
f1HighestNull + "'");
}
// The minimum nullable version in field2 must not be different from the minimum nullable
// version in field1.
short f1LowestNull = max(field1.nullableVersions().lowest(), topLevelMessage2.validVersions().lowest());
short f2LowestNull = max(field2.nullableVersions().lowest(), topLevelMessage2.validVersions().lowest());
if (f2LowestNull != f1LowestNull) {
throw new UnificationException("Minimum effective nullable version for field2 " +
field2.name() + ", '" + f2LowestNull + "' cannot be different than the " +
"minimum effective nullable version for field1 " + field1.name() + ", '" +
f1LowestNull + "'");
}
// Check that the flexibleVersions match exactly. Currently, there is only one case where
// flexibleVersions is set on a FieldSpec object: the FieldSpec is the ClientId string
// used in RequestHeader.json In every other case, FieldSpec.flexibleVersions() will be
// Optional.empty.
Versions field2EffectiveFlexibleVersions = field2.flexibleVersions().
orElseGet(() -> topLevelMessage2.flexibleVersions());
Versions field1EffectiveFlexibleVersions = field1.flexibleVersions().
orElseGet(() -> topLevelMessage1.flexibleVersions());
if (!field2EffectiveFlexibleVersions.contains(field1EffectiveFlexibleVersions)) {
throw new UnificationException("Flexible versions for field2 " + field2.name() +
" is " + field2.flexibleVersions().orElseGet(() -> Versions.NONE) +
", but flexible versions for field1 is " +
field1.flexibleVersions().orElseGet(() -> Versions.NONE));
}
// Check that defaults match exactly.
if (!field2.defaultString().equals(field1.defaultString())) {
throw new UnificationException("Default for field2 " + field2.name() + " is '" +
field2.defaultString() + "', but default for field1 " + field1.name() + " is '" +
field1.defaultString() + "'");
}
// Recursive step.
if (field1.type().isStruct()) {
unifyStructs(field1.name(),
field1.fields(),
field2.name(),
field2.fields());
} else if (field2.type().isStructArray()) {
unifyStructs(((FieldType.ArrayType) field1.type()).elementName(),
field1.fields(),
((FieldType.ArrayType) field2.type()).elementName(),
field2.fields());
}
}
void unifyStructs(
String struct1Name,
List<FieldSpec> struct1Fields,
String struct2Name,
List<FieldSpec> struct2Fields
) {
// By convention, structure names are always uppercase.
struct1Name = MessageGenerator.capitalizeFirst(struct1Name);
struct2Name = MessageGenerator.capitalizeFirst(struct2Name);
// If the list of struct fields is empty, it is assumed that the structure is defined in
// commonStructs. We have to look it up there in order to find its fields.
if (struct1Fields.isEmpty()) {
struct1Fields = lookupCommonStructFields(struct1Name, structRegistry1);
}
if (struct2Fields.isEmpty()) {
struct2Fields = lookupCommonStructFields(struct2Name, structRegistry2);
}
for (FieldSpec field1 : struct1Fields) {
CheckerUtils.validateTaggedVersions("field1", field1, topLevelMessage1.flexibleVersions());
}
for (FieldSpec field2 : struct2Fields) {
CheckerUtils.validateTaggedVersions("field2", field2, topLevelMessage2.flexibleVersions());
}
// Iterate over fields1 and fields2.
FieldSpecPairIterator iterator = new FieldSpecPairIterator(struct1Fields.iterator(),
struct2Fields.iterator(),
topLevelMessage1.validVersions(),
topLevelMessage2.validVersions());
while (iterator.hasNext()) {
FieldSpecPair pair = iterator.next();
unify(pair.field1(), pair.field2());
}
}
List<FieldSpec> lookupCommonStructFields(
String structName,
StructRegistry structRegistry
) {
StructSpec struct = structRegistry.findStruct(structName);
// TODO: we should probably validate the versions, etc. settings of the common struct.
// Maybe force them to always be 0+ since there it makes more sense to restrict them
// at the point of usage, not definition.
return struct.fields();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(120)
public class StructSpecTest {
@Test
public void testNamesMustBeUnique() {
assertEquals("In LeaderAndIsrRequest, field field1 has a duplicate name field1. All field names must be unique.",
assertThrows(ValueInstantiationException.class,
() -> MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0-4\",",
" \"deprecatedVersions\": \"0-1\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
" { \"name\": \"field1\", \"type\": \"[]int64\", \"versions\": \"1+\" }",
" ]",
"}")), MessageSpec.class)).getCause().getMessage());
}
@Test
public void testTagsMustBeUnique() {
assertEquals("In LeaderAndIsrRequest, field field2 has a duplicate tag ID 0. All tags IDs must be unique.",
assertThrows(ValueInstantiationException.class,
() -> MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0-4\",",
" \"deprecatedVersions\": \"0-1\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\", ",
" \"taggedVersions\": \"0+\", \"tag\": 0},",
" { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\", ",
" \"taggedVersions\": \"0+\", \"tag\": 0 }",
" ]",
"}")), MessageSpec.class)).getCause().getMessage());
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.MessageGenerator;
import org.apache.kafka.message.MessageSpec;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.OptionalInt;
public class CheckerTestUtils {
static String singleQuotesToDoubleQuotes(String input) {
return input.replaceAll("'", "\"");
}
static MessageSpec toMessage(String input) throws Exception {
return MessageGenerator.JSON_SERDE.
readValue(singleQuotesToDoubleQuotes(input), MessageSpec.class);
}
static FieldSpec field(
String name,
String versions,
String type
) {
return new FieldSpec(name,
versions,
null,
type,
false,
null,
null,
false,
null,
"",
null,
null,
null,
false);
}
static FieldSpec fieldWithTag(
String name,
OptionalInt tag
) {
return new FieldSpec(name,
"0+",
null,
"int8",
false,
null,
null,
false,
null,
null,
tag.isPresent() ? "0+" : "",
null,
tag.isPresent() ? tag.getAsInt() : null,
false);
}
static FieldSpec fieldWithTag(
String name,
int tag,
String validVersions,
String taggedVersions
) {
return new FieldSpec(name,
validVersions,
null,
"int8",
false,
null,
null,
false,
null,
null,
taggedVersions,
null,
tag,
false);
}
static FieldSpec fieldWithNulls(
String name,
String versions,
String type,
String nullableVersions
) {
return new FieldSpec(name,
versions,
null,
type,
false,
nullableVersions,
null,
false,
null,
"",
null,
null,
null,
false);
}
static FieldSpec fieldWithDefaults(
String name,
String versions,
String fieldDefault,
String flexibleVersions
) {
return new FieldSpec(name,
versions,
null,
"string",
false,
null,
fieldDefault,
false,
null,
"",
null,
flexibleVersions,
null,
false);
}
static String messageSpecStringToTempFile(String input) throws IOException {
File file = Files.createTempFile("MetadataSchemaCheckerToolTest", null).toFile();
file.deleteOnExit();
MessageSpec messageSpec = MessageGenerator.JSON_SERDE.
readValue(input.replaceAll("'", "\""), MessageSpec.class);
MessageGenerator.JSON_SERDE.writeValue(file, messageSpec);
return file.getAbsolutePath();
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.Versions;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.message.checker.CheckerTestUtils.field;
import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithTag;
import static org.apache.kafka.message.checker.CheckerTestUtils.messageSpecStringToTempFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class CheckerUtilsTest {
@Test
public void testMin0and1() {
assertEquals((short) 0, CheckerUtils.min((short) 0, (short) 1));
}
@Test
public void testMin1and0() {
assertEquals((short) 0, CheckerUtils.min((short) 1, (short) 0));
}
@Test
public void testMin5and5() {
assertEquals((short) 5, CheckerUtils.min((short) 5, (short) 5));
}
@Test
public void testMax0and1() {
assertEquals((short) 1, CheckerUtils.max((short) 0, (short) 1));
}
@Test
public void testMax1and0() {
assertEquals((short) 1, CheckerUtils.max((short) 1, (short) 0));
}
@Test
public void testMax5and5() {
assertEquals((short) 5, CheckerUtils.max((short) 5, (short) 5));
}
@Test
public void testValidateTaggedVersionsOnNontaggedField() {
CheckerUtils.validateTaggedVersions("field1",
field("foo", "0+", "int64"),
Versions.parse("5+", Versions.NONE));
}
@Test
public void testValidateTaggedVersionsOnTaggedField() {
CheckerUtils.validateTaggedVersions("field1",
fieldWithTag("foo", 123, "1+", "1+"),
Versions.parse("1+", Versions.NONE));
}
@Test
public void testValidateTaggedVersionsOnTaggedFieldWithError() {
assertThrows(RuntimeException.class,
() -> CheckerUtils.validateTaggedVersions("field1",
fieldWithTag("foo", 123, "1+", "1+"),
Versions.parse("2+", Versions.NONE)));
}
@Test
public void testReadMessageSpecFromFile() throws Exception {
CheckerUtils.readMessageSpecFromFile(messageSpecStringToTempFile(
"{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"));
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.MessageGenerator;
import org.apache.kafka.message.MessageSpec;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import static org.apache.kafka.message.checker.CheckerTestUtils.toMessage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(40)
public class EvolutionVerifierTest {
@Test
public void testTopLevelMessageApiKeysDoNotMatch() throws Exception {
assertEquals("Initial apiKey Optional[62] does not match final apiKey Optional[63]",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyTopLevelMessages(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"),
toMessage("{'apiKey':63, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))).
getMessage());
}
@Test
public void testTopLevelMessageTypesDoNotMatch() throws Exception {
assertEquals("Initial type REQUEST does not match final type RESPONSE",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyTopLevelMessages(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"),
toMessage("{'apiKey':62, 'type': 'response', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))).
getMessage());
}
@Test
public void testFlexibleVersionsIsNotASubset() throws Exception {
assertEquals("Initial flexibleVersions 0+ must be a subset of final flexibleVersions 1+",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyTopLevelMessages(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '1+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))).
getMessage());
}
@Test
public void testMaximumVersionOfInitialMessageIsHigher() throws Exception {
assertEquals("Initial maximum valid version 2 must not be higher than final maximum valid version 1",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyTopLevelMessages(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-1', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))).
getMessage());
}
@Test
public void testMinimumVersionOfInitialMessageIsHigher() throws Exception {
assertEquals("Initial minimum valid version 1 must not be higher than final minimum valid version 0",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyTopLevelMessages(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '1-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))).
getMessage());
}
@Test
public void testIncompatibleFieldTypeChange() throws Exception {
assertEquals("Field type for field2 UserId is int32, but field type for field1 UserId is int64",
assertThrows(UnificationException.class,
() -> new EvolutionVerifier(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '1-2', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," +
"{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," +
"{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" +
"]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '1-2', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," +
"{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," +
"{'name': 'UserId', 'type': 'int32', 'versions': '2+'}" +
"]}")).
verify()).
getMessage());
}
@Test
public void testNewFieldAddition() throws Exception {
new EvolutionVerifier(
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '1-2', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," +
"{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," +
"{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" +
"]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '1-3', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," +
"{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," +
"{'name': 'NewId', 'type': 'int64', 'versions': '3+'}," +
"{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" +
"]}")).
verify();
}
@Test
public void testFieldVersionsMustBeInsideTopLevelVersion() {
assertEquals("Field field2 in message1 has versions 1+, but the message versions are only 0.",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1",
MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
" { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"1+\" }",
" ]",
"}")), MessageSpec.class))).getMessage());
}
@Test
public void testFieldNullableVersionsMustBeInsideTopLevelVersion() {
assertEquals("Field field1 in message1 has nullableVersions 1+, but the message versions are only 0.",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1",
MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"string\", \"versions\": \"0+\", \"nullableVersions\": \"1+\"},",
" { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\" }",
" ]",
"}")), MessageSpec.class))).getMessage());
}
@Test
public void testFieldTaggedVersionsMustBeInsideTopLevelVersion() {
assertEquals("Field field1 in message1 has taggedVersions 1+, but the message versions are only 0.",
assertThrows(EvolutionException.class,
() -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1",
MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
"{",
" \"type\": \"request\",",
" \"name\": \"LeaderAndIsrRequest\",",
" \"validVersions\": \"0\",",
" \"flexibleVersions\": \"0+\",",
" \"fields\": [",
" { \"name\": \"field1\", \"type\": \"string\", \"versions\": \"0+\", \"taggedVersions\": \"1+\", \"tag\": 0},",
" { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\" }",
" ]",
"}")), MessageSpec.class))).getMessage());
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.Versions;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.message.checker.CheckerTestUtils.field;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class FieldDomainTest {
@Test
public void testMessage1Only() {
assertEquals(FieldDomain.MESSAGE1_ONLY,
FieldDomain.of(field("bar", "1-1", "string"),
new Versions((short) 0, (short) 1),
new Versions((short) 2, (short) 5)));
}
@Test
public void testBoth() {
assertEquals(FieldDomain.BOTH,
FieldDomain.of(field("bar", "1+", "string"),
new Versions((short) 0, (short) 1),
new Versions((short) 0, (short) 3)));
}
@Test
public void testMessage2Only() {
assertEquals(FieldDomain.MESSAGE2_ONLY,
FieldDomain.of(field("bar", "1+", "string"),
new Versions((short) 0, (short) 0),
new Versions((short) 0, (short) 1)));
}
@Test
public void testNeither() {
assertEquals(FieldDomain.NEITHER,
FieldDomain.of(field("bar", "2+", "string"),
new Versions((short) 0, (short) 0),
new Versions((short) 0, (short) 1)));
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.apache.kafka.message.checker.CheckerTestUtils.messageSpecStringToTempFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MetadataSchemaCheckerToolTest {
@Test
public void testSuccessfulParse() throws Exception {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
String path = messageSpecStringToTempFile(
"{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}");
MetadataSchemaCheckerTool.run(new String[] {"parse", "--path", path}, new PrintStream(stream));
assertEquals("Successfully parsed file as MessageSpec: " + path, stream.toString().trim());
}
}
@Test
public void testSuccessfulVerifyEvolution() throws Exception {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
String path = messageSpecStringToTempFile(
"{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-2', 'flexibleVersions': '0+', " +
"'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}");
MetadataSchemaCheckerTool.run(new String[] {"verify-evolution",
"--path1", path, "--path2", path}, new PrintStream(stream));
assertEquals("Successfully verified evolution of path1: " + path + ", and path2: " + path,
stream.toString().trim());
}
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.message.checker;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.MessageSpecType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Collections;
import static org.apache.kafka.message.checker.CheckerTestUtils.field;
import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithDefaults;
import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithNulls;
import static org.apache.kafka.message.checker.CheckerTestUtils.toMessage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(40)
public class UnifierTest {
@Test
public void testAddNewField() throws Exception {
new Unifier(toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}" +
"]}"),
toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " +
"'validVersions': '0-1', 'flexibleVersions': '0+', " +
"'fields': [" +
"{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," +
"{'name': 'ControlerId', 'type': 'int32', 'versions': '1+'}" +
"]}")).unify();
}
static final MessageSpec TOP_LEVEL_MESSAGE_1 = new MessageSpec("TopLevelMessage",
"0-2", null, null, null, MessageSpecType.DATA, Collections.emptyList(),
"0+", Collections.emptyList(), false);
static final MessageSpec TOP_LEVEL_MESSAGE_2 = new MessageSpec("TopLevelMessage",
"0-4", null, null, null, MessageSpecType.DATA, Collections.emptyList(),
"0+", Collections.emptyList(), false);
static final MessageSpec TOP_LEVEL_MESSAGE_2_DROPPING_V0 = new MessageSpec("TopLevelMessage",
"1-4", null, null, null, MessageSpecType.DATA, Collections.emptyList(),
"0+", Collections.emptyList(), false);
@Test
public void testFieldTypesDoNotMatch() throws Exception {
assertEquals("Field type for field2 foo is int8, but field type for field1 foo is int16",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0+", "int16"),
field("foo", "0+", "int8"))).getMessage());
}
@Test
public void testFieldTypesMatch() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0+", "int16"),
field("foo", "0+", "int16"));
}
@Test
public void testArrayElementTypesDoNotMatch() throws Exception {
assertEquals("Field type for field2 foo is []int8, but field type for field1 foo is []int16",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0+", "[]int16"),
field("foo", "0+", "[]int8"))).getMessage());
}
@Test
public void testArrayFieldTypesMatch() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0+", "[]int16"),
field("foo", "0+", "[]int16"));
}
@Test
public void testMaximumValidVersionForField2IsLowerThanField1() throws Exception {
assertEquals("Maximum effective valid version for field2 foo, '1' cannot be lower than the " +
"maximum effective valid version for field1 foo, '2'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0-2", "int64"),
field("foo", "0-1", "int64"))).getMessage());
}
@Test
public void testMaximumValidVersionForIsReasonable1() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0-1", "int64"),
field("foo", "0-2", "int64"));
}
@Test
public void testMaximumValidVersionForIsReasonable2() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0+", "int64"), // effective max is 2, because TOP_LEVEL_MESSAGE_1 supports 0-2
field("foo", "0-3", "int64"));
}
@Test
public void testMinimumValidVersionForField2IsLowerThanField1() throws Exception {
assertEquals("Minimum effective valid version for field2 foo, '0' cannot be different than the " +
"minimum effective valid version for field1 foo, '1'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "1+", "int64"),
field("foo", "0+", "int64"))).getMessage());
}
@Test
public void testMinimumValidVersionForField2IsLowerThanField1Again() throws Exception {
assertEquals("Minimum effective valid version for field2 foo, '0' cannot be different than the " +
"minimum effective valid version for field1 foo, '1'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "1-2", "int64"),
field("foo", "0-2", "int64"))).getMessage());
}
@Test
public void testMinimumValidVersionForField2IsHigherThanField1() throws Exception {
assertEquals("Minimum effective valid version for field2 foo, '1' cannot be different than the " +
"minimum effective valid version for field1 foo, '0'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
field("foo", "0-2", "int64"),
field("foo", "1-2", "int64"))).getMessage());
}
@Test
public void testNullableVersionsCheckPasses1() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2_DROPPING_V0).unify(
fieldWithNulls("foo", "0-2", "string", "0+"),
fieldWithNulls("foo", "1-2", "string", "1-2"));
}
@Test
public void testNullableVersionsCheckPasses2() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithNulls("foo", "0+", "string", "0+"), // effectively 0-2 because of max valid version
fieldWithNulls("foo", "0+", "string", "0-2"));
}
@Test
public void testNullableVersionsCheckFails1() throws Exception {
assertEquals("Minimum effective nullable version for field2 foo, '1' cannot be different than " +
"the minimum effective nullable version for field1 foo, '0'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithNulls("foo", "0-2", "string", "0-2"),
fieldWithNulls("foo", "0-2", "string", "1-2"))).getMessage());
}
@Test
public void testNullableVersionsCheckFails2() throws Exception {
assertEquals("Minimum effective nullable version for field2 foo, '1' cannot be different than " +
"the minimum effective nullable version for field1 foo, '0'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithNulls("foo", "0+", "string", "0+"),
fieldWithNulls("foo", "0+", "string", "1-2"))).getMessage());
}
@Test
public void testFlexibleVersionsChangedCausesFailure1() throws Exception {
assertEquals("Flexible versions for field2 foo is 2+, but flexible versions for field1 is 1+",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithDefaults("foo", "0+", null, "1+"),
fieldWithDefaults("foo", "0+", null, "2+"))).getMessage());
}
@Test
public void testFlexibleVersionsChangedCausesFailure2() throws Exception {
assertEquals("Flexible versions for field2 foo is 2+, but flexible versions for field1 is none",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithDefaults("foo", "0+", null, ""),
fieldWithDefaults("foo", "0+", null, "2+"))).getMessage());
}
@Test
public void testFlexibleVersionsCheckPasses() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithDefaults("foo", "0+", null, "1+"),
fieldWithDefaults("foo", "0+", null, "1+"));
}
@Test
public void testDefaultsChangedCausesFailure() throws Exception {
assertEquals("Default for field2 foo is 'newDefault', but default for field1 foo is 'oldDefault'",
assertThrows(UnificationException.class,
() -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithDefaults("foo", "0+", "oldDefault", null),
fieldWithDefaults("foo", "0+", "newDefault", null))).getMessage());
}
@Test
public void testDefaultsCheckPasses() throws Exception {
new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify(
fieldWithDefaults("foo", "0+", "oldDefault", null),
fieldWithDefaults("foo", "0+", "oldDefault", null));
}
}