From dbd50ff84725c01785472087dd1f357e9ed860ae Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 3 Oct 2024 12:13:38 -0700 Subject: [PATCH] KAFKA-16469: Metadata schema checker (#15995) Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones. Reviewers: David Arthur --- checkstyle/import-control.xml | 1 + checkstyle/suppressions.xml | 4 +- .../org/apache/kafka/message/FieldSpec.java | 7 + .../kafka/message/MessageGenerator.java | 6 +- .../apache/kafka/message/StructRegistry.java | 14 +- .../org/apache/kafka/message/StructSpec.java | 16 +- .../kafka/message/checker/CheckerUtils.java | 94 ++++++++ .../message/checker/EvolutionException.java | 31 +++ .../message/checker/EvolutionVerifier.java | 119 ++++++++++ .../kafka/message/checker/FieldDomain.java | 105 ++++++++ .../kafka/message/checker/FieldSpecPair.java | 48 ++++ .../checker/FieldSpecPairIterator.java | 111 +++++++++ .../checker/MetadataSchemaCheckerTool.java | 88 +++++++ .../message/checker/UnificationException.java | 30 +++ .../apache/kafka/message/checker/Unifier.java | 205 ++++++++++++++++ .../apache/kafka/message/StructSpecTest.java | 69 ++++++ .../message/checker/CheckerTestUtils.java | 154 ++++++++++++ .../message/checker/CheckerUtilsTest.java | 90 +++++++ .../checker/EvolutionVerifierTest.java | 202 ++++++++++++++++ .../message/checker/FieldDomainTest.java | 59 +++++ .../MetadataSchemaCheckerToolTest.java | 54 +++++ .../kafka/message/checker/UnifierTest.java | 224 ++++++++++++++++++ 22 files changed, 1717 insertions(+), 14 deletions(-) create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/EvolutionException.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/FieldDomain.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPair.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPairIterator.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/UnificationException.java create mode 100644 generator/src/main/java/org/apache/kafka/message/checker/Unifier.java create mode 100644 generator/src/test/java/org/apache/kafka/message/StructSpecTest.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/CheckerTestUtils.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/CheckerUtilsTest.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/EvolutionVerifierTest.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/FieldDomainTest.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/MetadataSchemaCheckerToolTest.java create mode 100644 generator/src/test/java/org/apache/kafka/message/checker/UnifierTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e801c2195d5..74c99f834e7 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -365,6 +365,7 @@ + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 441b02e1f56..40ffcc5ccce 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -25,9 +25,9 @@ + files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldSpecPairIterator|FieldType).java"/> + files="(FieldSpecPairIterator|MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/> createTypeClassGenerators(String packageName, @@ -272,7 +274,7 @@ public final class MessageGenerator { System.out.printf("MessageGenerator: processed %d Kafka message JSON files(s).%n", numProcessed); } - static String capitalizeFirst(String string) { + public static String capitalizeFirst(String string) { if (string.isEmpty()) { return string; } diff --git a/generator/src/main/java/org/apache/kafka/message/StructRegistry.java b/generator/src/main/java/org/apache/kafka/message/StructRegistry.java index 6d923255139..c62bb37c75e 100644 --- a/generator/src/main/java/org/apache/kafka/message/StructRegistry.java +++ b/generator/src/main/java/org/apache/kafka/message/StructRegistry.java @@ -27,7 +27,7 @@ import java.util.TreeSet; /** * Contains structure data for Kafka MessageData classes. */ -final class StructRegistry { +public final class StructRegistry { private final Map structs; private final Set commonStructNames; @@ -58,7 +58,7 @@ final class StructRegistry { } } - StructRegistry() { + public StructRegistry() { this.structs = new TreeMap<>(); this.commonStructNames = new TreeSet<>(); } @@ -66,7 +66,7 @@ final class StructRegistry { /** * Register all the structures contained a message spec. */ - void register(MessageSpec message) { + public void register(MessageSpec message) throws Exception { // Register common structures. for (StructSpec struct : message.commonStructs()) { if (!MessageGenerator.firstIsCapitalized(struct.name())) { @@ -122,7 +122,7 @@ final class StructRegistry { /** * Locate the struct corresponding to a field. */ - StructSpec findStruct(FieldSpec field) { + public StructSpec findStruct(FieldSpec field) { String structFieldName; if (field.type().isArray()) { FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type(); @@ -134,6 +134,10 @@ final class StructRegistry { throw new RuntimeException("Field " + field.name() + " cannot be treated as a structure."); } + return findStruct(structFieldName); + } + + public StructSpec findStruct(String structFieldName) { StructInfo structInfo = structs.get(structFieldName); if (structInfo == null) { throw new RuntimeException("Unable to locate a specification for the structure " + @@ -145,7 +149,7 @@ final class StructRegistry { /** * Return true if the field is a struct array with keys. */ - boolean isStructArrayWithKeys(FieldSpec field) { + public boolean isStructArrayWithKeys(FieldSpec field) { if (!field.type().isArray()) { return false; } diff --git a/generator/src/main/java/org/apache/kafka/message/StructSpec.java b/generator/src/main/java/org/apache/kafka/message/StructSpec.java index 0d150945e99..24bd2739b0d 100644 --- a/generator/src/main/java/org/apache/kafka/message/StructSpec.java +++ b/generator/src/main/java/org/apache/kafka/message/StructSpec.java @@ -53,14 +53,20 @@ public final class StructSpec { if (fields != null) { // Each field should have a unique tag ID (if the field has a tag ID). HashSet tags = new HashSet<>(); + // Each field should have a unique name. + HashSet names = new HashSet<>(); for (FieldSpec field : fields) { - if (field.tag().isPresent()) { - if (tags.contains(field.tag().get())) { + field.tag().ifPresent(tag -> { + if (!tags.add(tag)) { throw new RuntimeException("In " + name + ", field " + field.name() + - " has a duplicate tag ID " + field.tag().get() + ". All tags IDs " + - "must be unique."); + " has a duplicate tag ID " + tag + ". All tags IDs " + + "must be unique."); } - tags.add(field.tag().get()); + }); + if (!names.add(field.name())) { + throw new RuntimeException("In " + name + ", field " + field.name() + + " has a duplicate name " + field.name() + ". All field names " + + "must be unique."); } newFields.add(field); } diff --git a/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java b/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java new file mode 100644 index 00000000000..d4b89f54048 --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/CheckerUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.MessageGenerator; +import org.apache.kafka.message.MessageSpec; +import org.apache.kafka.message.Versions; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Utilities for the metadata schema checker. + */ +class CheckerUtils { + /** + * A min function defined for shorts. + * + * @param a The first short integer to compare. + * @param b The second short integer to compare. + * @return The minimum short integer. + */ + static short min(short a, short b) { + return a < b ? a : b; + } + + /** + * A max function defined for shorts. + * + * @param a The first short integer to compare. + * @param b The second short integer to compare. + * @return The maximum short integer. + */ + static short max(short a, short b) { + return a > b ? a : b; + } + + /** + * Validate the a field doesn't have tagged versions that are outside of the top-level flexible + * versions. + * + * @param what A description of the field. + * @param field The field to validate. + * @param topLevelFlexibleVersions The top-level flexible versions. + */ + static void validateTaggedVersions( + String what, + FieldSpec field, + Versions topLevelFlexibleVersions + ) { + if (!field.flexibleVersions().isPresent()) { + if (!topLevelFlexibleVersions.contains(field.taggedVersions())) { + throw new RuntimeException("Tagged versions for " + what + " " + + field.name() + " are " + field.taggedVersions() + ", but top " + + "level flexible versions are " + topLevelFlexibleVersions); + } + } + } + + /** + * Read a MessageSpec file from a path. + * + * @param schemaPath The path to read the file from. + * @return The MessageSpec. + */ + static MessageSpec readMessageSpecFromFile(String schemaPath) { + if (!Files.isReadable(Paths.get(schemaPath))) { + throw new RuntimeException("Path " + schemaPath + " does not point to " + + "a readable file."); + } + try { + return MessageGenerator.JSON_SERDE.readValue(new File(schemaPath), MessageSpec.class); + } catch (Exception e) { + throw new RuntimeException("Unable to parse file as MessageSpec: " + schemaPath, e); + } + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/EvolutionException.java b/generator/src/main/java/org/apache/kafka/message/checker/EvolutionException.java new file mode 100644 index 00000000000..1e76c85281f --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/EvolutionException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +/** + * An exception thrown when a schema is evolved an invalid way. + */ +public class EvolutionException extends RuntimeException { + public EvolutionException(String message, Throwable t) { + super(message, t); + } + + public EvolutionException(String message) { + super(message, null); + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java b/generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java new file mode 100644 index 00000000000..64885beec71 --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.MessageSpec; +import org.apache.kafka.message.StructSpec; + +public class EvolutionVerifier { + private final MessageSpec topLevelMessage1; + private final MessageSpec topLevelMessage2; + + public EvolutionVerifier( + MessageSpec topLevelMessage1, + MessageSpec topLevelMessage2 + ) { + this.topLevelMessage1 = topLevelMessage1; + this.topLevelMessage2 = topLevelMessage2; + } + + public void verify() throws Exception { + verifyTopLevelMessages(topLevelMessage1, topLevelMessage2); + verifyVersionsMatchTopLevelMessage("message1", topLevelMessage1); + verifyVersionsMatchTopLevelMessage("message2", topLevelMessage2); + Unifier unifier = new Unifier(topLevelMessage1, topLevelMessage2); + unifier.unify(); + } + + static void verifyTopLevelMessages(MessageSpec topLevelMessage1, MessageSpec topLevelMessage2) { + if (!topLevelMessage1.apiKey().equals(topLevelMessage2.apiKey())) { + throw new EvolutionException("Initial apiKey " + topLevelMessage1.apiKey() + + " does not match final apiKey " + topLevelMessage2.apiKey()); + } + if (!topLevelMessage1.type().equals(topLevelMessage2.type())) { + throw new EvolutionException("Initial type " + topLevelMessage1.type() + + " does not match final type " + topLevelMessage2.type()); + } + if (!topLevelMessage2.flexibleVersions().contains(topLevelMessage1.flexibleVersions())) { + throw new EvolutionException("Initial flexibleVersions " + topLevelMessage1.flexibleVersions() + + " must be a subset of final flexibleVersions " + topLevelMessage2.flexibleVersions()); + } + if (topLevelMessage2.validVersions().highest() < topLevelMessage1.validVersions().highest()) { + throw new EvolutionException("Initial maximum valid version " + + topLevelMessage1.validVersions().highest() + " must not be higher than final " + + "maximum valid version " + topLevelMessage2.validVersions().highest()); + } + if (topLevelMessage2.validVersions().lowest() < topLevelMessage1.validVersions().lowest()) { + throw new EvolutionException("Initial minimum valid version " + + topLevelMessage1.validVersions().lowest() + " must not be higher than final " + + "minimum valid version " + topLevelMessage2.validVersions().lowest()); + } + } + + static void verifyVersionsMatchTopLevelMessage( + String what, + MessageSpec topLevelMessage + ) { + for (FieldSpec field : topLevelMessage.fields()) { + verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field); + } + for (StructSpec struct : topLevelMessage.commonStructs()) { + for (FieldSpec field : topLevelMessage.fields()) { + verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field); + } + } + } + + static void verifyVersionsMatchTopLevelMessage( + String what, + MessageSpec topLevelMessage, + FieldSpec field + ) { + if (topLevelMessage.validVersions().intersect(field.versions()).empty()) { + throw new EvolutionException("Field " + field.name() + " in " + what + " has versions " + + field.versions() + ", but the message versions are only " + + topLevelMessage.validVersions() + "."); + } + if (!field.nullableVersions().empty()) { + if (topLevelMessage.validVersions().intersect(field.nullableVersions()).empty()) { + throw new EvolutionException("Field " + field.name() + " in " + what + + " has nullableVersions " + field.nullableVersions() + ", but the message " + + "versions are only " + topLevelMessage.validVersions() + "."); + } + } + if (field.tag().isPresent()) { + if (topLevelMessage.validVersions().intersect(field.taggedVersions()).empty()) { + throw new EvolutionException("Field " + field.name() + " in " + what + + " has taggedVersions " + field.taggedVersions() + ", but the message " + + "versions are only " + topLevelMessage.validVersions() + "."); + } + } + field.flexibleVersions().ifPresent(v -> { + if (topLevelMessage.validVersions().intersect(v).empty()) { + throw new EvolutionException("Field " + field.name() + " in " + what + + " has flexibleVersions " + v + ", but the message versions are only " + + topLevelMessage.validVersions() + "."); + } + + }); + for (FieldSpec child : field.fields()) { + verifyVersionsMatchTopLevelMessage(what, topLevelMessage, child); + } + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/FieldDomain.java b/generator/src/main/java/org/apache/kafka/message/checker/FieldDomain.java new file mode 100644 index 00000000000..08016c6e63c --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/FieldDomain.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.Versions; + +/** + * FieldDomain represents whether a field should appear in message1, message2, both, or neither. + * + * Note that this class does not handle tagged fields. Tagged fields can appear in any version, + * provided that the version is a flexibleVersion. In other words, adding a tagged field to an + * existing version is not an incompatible change. (However, reusing a tag index certainly is.) + */ +enum FieldDomain { + MESSAGE1_ONLY, + BOTH, + MESSAGE2_ONLY, + NEITHER; + + static FieldDomain of( + FieldSpec fieldSpec, + Versions versions1, + Versions versions2 + ) { + Versions intersection1 = versions1.intersect(fieldSpec.versions()); + Versions intersection2 = versions2.intersect(fieldSpec.versions()); + if (intersection1.empty()) { + if (intersection2.empty()) { + return NEITHER; + } else { + return MESSAGE2_ONLY; + } + } else if (intersection2.empty()) { + return MESSAGE1_ONLY; + } else { + return BOTH; + } + } + + void validate( + String what, + FieldSpec field, + boolean present1, + boolean present2 + ) { + switch (this) { + case MESSAGE1_ONLY: + if (present2) { + throw new UnificationException(what + " " + field.name() + " is present in " + + "message2, but should not be, based on its versions."); + } + if (!present1) { + throw new UnificationException(what + " " + field.name() + " is not present in " + + "message1, but should be, based on its versions."); + } + break; + case BOTH: + if (!present1) { + throw new UnificationException(what + " " + field.name() + " is not present in " + + "message1, but should be, based on its versions."); + } + if (!present2) { + throw new UnificationException(what + " " + field.name() + " is not present in " + + "message2, but should be, based on its versions."); + } + break; + case MESSAGE2_ONLY: + if (present1) { + throw new UnificationException(what + " " + field.name() + " is present in " + + "message1, but should not be, based on its versions."); + } + if (!present2) { + throw new UnificationException(what + " " + field.name() + " is not present in " + + "message2, but should be, based on its versions."); + } + break; + case NEITHER: + if (present1) { + throw new UnificationException(what + " " + field.name() + " is present in " + + "message1, but should not be, based on its versions."); + } + if (present2) { + throw new UnificationException(what + " " + field.name() + " is present in " + + "message2, but should not be, based on its versions."); + } + break; + } + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPair.java b/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPair.java new file mode 100644 index 00000000000..e4867a54690 --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPair.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; + +class FieldSpecPair { + private final FieldSpec field1; + private final FieldSpec field2; + + FieldSpecPair( + FieldSpec field1, + FieldSpec field2 + ) { + this.field1 = field1; + this.field2 = field2; + } + + FieldSpec field1() { + return field1; + } + + FieldSpec field2() { + return field2; + } + + @Override + public String toString() { + return "FieldSpecPair(field1=" + field1.name() + + ", field2=" + field2.name() + + ")"; + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPairIterator.java b/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPairIterator.java new file mode 100644 index 00000000000..80a90bf3a1d --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/FieldSpecPairIterator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.Versions; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +class FieldSpecPairIterator implements Iterator { + private final Iterator iterator1; + private final Iterator iterator2; + private final Versions topLevelVersions1; + private final Versions topLevelVersions2; + private FieldSpecPair next; + + FieldSpecPairIterator( + Iterator iterator1, + Iterator iterator2, + Versions topLevelVersions1, + Versions topLevelVersions2 + ) { + this.iterator1 = iterator1; + this.iterator2 = iterator2; + this.topLevelVersions1 = topLevelVersions1; + this.topLevelVersions2 = topLevelVersions2; + this.next = null; + } + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + FieldSpec field1 = iterator1.hasNext() ? iterator1.next() : null; + while (field1 != null) { + FieldDomain domain1 = FieldDomain.of(field1, topLevelVersions1, topLevelVersions2); + switch (domain1) { + case MESSAGE1_ONLY: + field1 = iterator1.hasNext() ? iterator1.next() : null; + break; + case BOTH: { + FieldSpec field2 = iterator2.hasNext() ? iterator2.next() : null; + while (field2 != null) { + FieldDomain domain2 = FieldDomain.of(field2, topLevelVersions1, topLevelVersions2); + switch (domain2) { + case MESSAGE2_ONLY: + field2 = iterator2.hasNext() ? iterator2.next() : null; + break; + case BOTH: + next = new FieldSpecPair(field1, field2); + return true; + case MESSAGE1_ONLY: + case NEITHER: + throw new UnificationException("field2 " + field2.name() + " is present in " + + "message2, but should not be, based on its versions."); + } + } + break; + } + case MESSAGE2_ONLY: + case NEITHER: + throw new UnificationException("field1 " + field1.name() + " is present in " + + "message1, but should not be, based on its versions."); + } + } + FieldSpec field2 = iterator2.hasNext() ? iterator2.next() : null; + while (field2 != null) { + FieldDomain domain2 = FieldDomain.of(field2, topLevelVersions1, topLevelVersions2); + switch (domain2) { + case MESSAGE1_ONLY: + case NEITHER: + throw new UnificationException("field2 " + field2.name() + " is present in " + + "message2, but should not be, based on its versions."); + case BOTH: + throw new UnificationException("field2 " + field2.name() + " should be present " + + "in message1, but is not, based on its versions."); + case MESSAGE2_ONLY: + field2 = iterator2.hasNext() ? iterator2.next() : null; + break; + } + } + return false; + } + + @Override + public FieldSpecPair next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + FieldSpecPair result = next; + next = null; + return result; + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java b/generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java new file mode 100644 index 00000000000..93cc343a827 --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import net.sourceforge.argparse4j.internal.HelpScreenException; + +import java.io.PrintStream; + +public class MetadataSchemaCheckerTool { + public static void main(String[] args) throws Exception { + try { + run(args, System.out); + } catch (HelpScreenException e) { + } + } + + public static void run( + String[] args, + PrintStream writer + ) throws Exception { + ArgumentParser argumentParser = ArgumentParsers. + newArgumentParser("metadata-schema-checker"). + defaultHelp(true). + description("The Kafka metadata schema checker tool."); + Subparsers subparsers = argumentParser.addSubparsers().dest("command"); + Subparser parseParser = subparsers.addParser("parse"). + help("Verify that a JSON file can be parsed as a MessageSpec."); + parseParser.addArgument("--path", "-p"). + required(true). + help("The path to a schema JSON file."); + Subparser evolutionVerifierParser = subparsers.addParser("verify-evolution"). + help("Verify that an evolution of a JSON file is valid."); + evolutionVerifierParser.addArgument("--path1", "-1"). + required(true). + help("The initial schema JSON path."); + evolutionVerifierParser.addArgument("--path2", "-2"). + required(true). + help("The final schema JSON path."); + Namespace namespace; + if (args.length == 0) { + namespace = argumentParser.parseArgs(new String[] {"--help"}); + } else { + namespace = argumentParser.parseArgs(args); + } + String command = namespace.getString("command"); + switch (command) { + case "parse": { + String path = namespace.getString("path"); + CheckerUtils.readMessageSpecFromFile(path); + writer.println("Successfully parsed file as MessageSpec: " + path); + break; + } + case "verify-evolution": { + String path1 = namespace.getString("path1"); + String path2 = namespace.getString("path2"); + EvolutionVerifier verifier = new EvolutionVerifier( + CheckerUtils.readMessageSpecFromFile(path1), + CheckerUtils.readMessageSpecFromFile(path2)); + verifier.verify(); + writer.println("Successfully verified evolution of path1: " + path1 + + ", and path2: " + path2); + break; + } + default: + throw new RuntimeException("Unknown command " + command); + } + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/UnificationException.java b/generator/src/main/java/org/apache/kafka/message/checker/UnificationException.java new file mode 100644 index 00000000000..984c3240f8e --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/UnificationException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +public class UnificationException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public UnificationException(String message, Throwable cause) { + super(message, cause); + } + + public UnificationException(String message) { + this(message, null); + } +} diff --git a/generator/src/main/java/org/apache/kafka/message/checker/Unifier.java b/generator/src/main/java/org/apache/kafka/message/checker/Unifier.java new file mode 100644 index 00000000000..2435de74e1b --- /dev/null +++ b/generator/src/main/java/org/apache/kafka/message/checker/Unifier.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.EntityType; +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.FieldType; +import org.apache.kafka.message.MessageGenerator; +import org.apache.kafka.message.MessageSpec; +import org.apache.kafka.message.StructRegistry; +import org.apache.kafka.message.StructSpec; +import org.apache.kafka.message.Versions; + +import java.util.List; + +import static org.apache.kafka.message.checker.CheckerUtils.max; +import static org.apache.kafka.message.checker.CheckerUtils.min; + +/** + * The unifier attempts to unify the types of two KRPC messages. In other words, to see them as + * two different points in the evolution of a single message + * specification. + */ +class Unifier { + private final MessageSpec topLevelMessage1; + private final StructRegistry structRegistry1; + private final MessageSpec topLevelMessage2; + private final StructRegistry structRegistry2; + + Unifier( + MessageSpec topLevelMessage1, + MessageSpec topLevelMessage2 + ) throws Exception { + this.topLevelMessage1 = topLevelMessage1; + this.structRegistry1 = new StructRegistry(); + this.structRegistry1.register(topLevelMessage1); + this.topLevelMessage2 = topLevelMessage2; + this.structRegistry2 = new StructRegistry(); + this.structRegistry1.register(topLevelMessage2); + } + + static FieldSpec structSpecToFieldSpec(StructSpec structSpec) { + return new FieldSpec(structSpec.name(), + structSpec.versions().toString(), + structSpec.fields(), + MessageGenerator.capitalizeFirst(structSpec.name()), + false, + "", + "", + false, + EntityType.UNKNOWN, + "Top level StructSpec", + "", + null, + null, + false); + } + + void unify() { + unify(structSpecToFieldSpec(topLevelMessage1.struct()), + structSpecToFieldSpec(topLevelMessage2.struct())); + } + + void unify(FieldSpec field1, FieldSpec field2) { + // If the types don't match, then these fields cannot be unified. Of course, even if the + // types do match, we might be looking at two different structs, or something like that. + if (!field2.type().toString().equals(field1.type().toString())) { + throw new UnificationException("Field type for field2 " + field2.name() + " is " + + field2.type() + ", but field type for field1 " + field1.name() + " is " + + field1.type()); + } + + // The maximum supported version in field2 must be not be lower than the maximum supported + // version in field1. + short f1Highest = min(field1.versions().highest(), topLevelMessage1.validVersions().highest()); + short f2Highest = min(field2.versions().highest(), topLevelMessage2.validVersions().highest()); + if (f2Highest < f1Highest) { + throw new UnificationException("Maximum effective valid version for field2 " + + field2.name() + ", '" + f2Highest + "' cannot be lower than the " + + "maximum effective valid version for field1 " + field1.name() + ", '" + f1Highest + "'"); + } + // The minimum supported version in field2 must not be different from the minimum supported + // version in field1. + short f1Lowest = max(field1.versions().lowest(), topLevelMessage2.validVersions().lowest()); + short f2Lowest = max(field2.versions().lowest(), topLevelMessage2.validVersions().lowest()); + if (f2Lowest != f1Lowest) { + throw new UnificationException("Minimum effective valid version for field2 " + + field2.name() + ", '" + f2Lowest + "' cannot be different than the " + + "minimum effective valid version for field1 " + field1.name() + ", '" + + f1Lowest + "'"); + } + // The maximum nullable version in field2 must not be lower than the maximum nullable + // version in field1. + short f1HighestNull = min(f1Highest, field2.nullableVersions().highest()); + short f2HighestNull = min(f2Highest, field1.nullableVersions().highest()); + if (f2HighestNull < f1HighestNull) { + throw new UnificationException("Maximum effective nullable version for field2 " + + field2.name() + ", '" + f2HighestNull + "' cannot be lower than the " + + "minimum effective nullable version for field1 " + field1.name() + ", '" + + f1HighestNull + "'"); + } + // The minimum nullable version in field2 must not be different from the minimum nullable + // version in field1. + short f1LowestNull = max(field1.nullableVersions().lowest(), topLevelMessage2.validVersions().lowest()); + short f2LowestNull = max(field2.nullableVersions().lowest(), topLevelMessage2.validVersions().lowest()); + if (f2LowestNull != f1LowestNull) { + throw new UnificationException("Minimum effective nullable version for field2 " + + field2.name() + ", '" + f2LowestNull + "' cannot be different than the " + + "minimum effective nullable version for field1 " + field1.name() + ", '" + + f1LowestNull + "'"); + } + // Check that the flexibleVersions match exactly. Currently, there is only one case where + // flexibleVersions is set on a FieldSpec object: the FieldSpec is the ClientId string + // used in RequestHeader.json In every other case, FieldSpec.flexibleVersions() will be + // Optional.empty. + Versions field2EffectiveFlexibleVersions = field2.flexibleVersions(). + orElseGet(() -> topLevelMessage2.flexibleVersions()); + Versions field1EffectiveFlexibleVersions = field1.flexibleVersions(). + orElseGet(() -> topLevelMessage1.flexibleVersions()); + if (!field2EffectiveFlexibleVersions.contains(field1EffectiveFlexibleVersions)) { + throw new UnificationException("Flexible versions for field2 " + field2.name() + + " is " + field2.flexibleVersions().orElseGet(() -> Versions.NONE) + + ", but flexible versions for field1 is " + + field1.flexibleVersions().orElseGet(() -> Versions.NONE)); + } + // Check that defaults match exactly. + if (!field2.defaultString().equals(field1.defaultString())) { + throw new UnificationException("Default for field2 " + field2.name() + " is '" + + field2.defaultString() + "', but default for field1 " + field1.name() + " is '" + + field1.defaultString() + "'"); + } + // Recursive step. + if (field1.type().isStruct()) { + unifyStructs(field1.name(), + field1.fields(), + field2.name(), + field2.fields()); + } else if (field2.type().isStructArray()) { + unifyStructs(((FieldType.ArrayType) field1.type()).elementName(), + field1.fields(), + ((FieldType.ArrayType) field2.type()).elementName(), + field2.fields()); + } + } + + void unifyStructs( + String struct1Name, + List struct1Fields, + String struct2Name, + List struct2Fields + ) { + // By convention, structure names are always uppercase. + struct1Name = MessageGenerator.capitalizeFirst(struct1Name); + struct2Name = MessageGenerator.capitalizeFirst(struct2Name); + // If the list of struct fields is empty, it is assumed that the structure is defined in + // commonStructs. We have to look it up there in order to find its fields. + if (struct1Fields.isEmpty()) { + struct1Fields = lookupCommonStructFields(struct1Name, structRegistry1); + } + if (struct2Fields.isEmpty()) { + struct2Fields = lookupCommonStructFields(struct2Name, structRegistry2); + } + for (FieldSpec field1 : struct1Fields) { + CheckerUtils.validateTaggedVersions("field1", field1, topLevelMessage1.flexibleVersions()); + } + for (FieldSpec field2 : struct2Fields) { + CheckerUtils.validateTaggedVersions("field2", field2, topLevelMessage2.flexibleVersions()); + } + // Iterate over fields1 and fields2. + FieldSpecPairIterator iterator = new FieldSpecPairIterator(struct1Fields.iterator(), + struct2Fields.iterator(), + topLevelMessage1.validVersions(), + topLevelMessage2.validVersions()); + while (iterator.hasNext()) { + FieldSpecPair pair = iterator.next(); + unify(pair.field1(), pair.field2()); + } + } + + List lookupCommonStructFields( + String structName, + StructRegistry structRegistry + ) { + StructSpec struct = structRegistry.findStruct(structName); + // TODO: we should probably validate the versions, etc. settings of the common struct. + // Maybe force them to always be 0+ since there it makes more sense to restrict them + // at the point of usage, not definition. + return struct.fields(); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/StructSpecTest.java b/generator/src/test/java/org/apache/kafka/message/StructSpecTest.java new file mode 100644 index 00000000000..c56e227033b --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/StructSpecTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message; + +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Timeout(120) +public class StructSpecTest { + @Test + public void testNamesMustBeUnique() { + assertEquals("In LeaderAndIsrRequest, field field1 has a duplicate name field1. All field names must be unique.", + assertThrows(ValueInstantiationException.class, + () -> MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList( + "{", + " \"type\": \"request\",", + " \"name\": \"LeaderAndIsrRequest\",", + " \"validVersions\": \"0-4\",", + " \"deprecatedVersions\": \"0-1\",", + " \"flexibleVersions\": \"0+\",", + " \"fields\": [", + " { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },", + " { \"name\": \"field1\", \"type\": \"[]int64\", \"versions\": \"1+\" }", + " ]", + "}")), MessageSpec.class)).getCause().getMessage()); + } + + @Test + public void testTagsMustBeUnique() { + assertEquals("In LeaderAndIsrRequest, field field2 has a duplicate tag ID 0. All tags IDs must be unique.", + assertThrows(ValueInstantiationException.class, + () -> MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList( + "{", + " \"type\": \"request\",", + " \"name\": \"LeaderAndIsrRequest\",", + " \"validVersions\": \"0-4\",", + " \"deprecatedVersions\": \"0-1\",", + " \"flexibleVersions\": \"0+\",", + " \"fields\": [", + " { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\", ", + " \"taggedVersions\": \"0+\", \"tag\": 0},", + " { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\", ", + " \"taggedVersions\": \"0+\", \"tag\": 0 }", + " ]", + "}")), MessageSpec.class)).getCause().getMessage()); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/CheckerTestUtils.java b/generator/src/test/java/org/apache/kafka/message/checker/CheckerTestUtils.java new file mode 100644 index 00000000000..3db9982c106 --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/CheckerTestUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.FieldSpec; +import org.apache.kafka.message.MessageGenerator; +import org.apache.kafka.message.MessageSpec; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.OptionalInt; + +public class CheckerTestUtils { + static String singleQuotesToDoubleQuotes(String input) { + return input.replaceAll("'", "\""); + } + + static MessageSpec toMessage(String input) throws Exception { + return MessageGenerator.JSON_SERDE. + readValue(singleQuotesToDoubleQuotes(input), MessageSpec.class); + } + + static FieldSpec field( + String name, + String versions, + String type + ) { + return new FieldSpec(name, + versions, + null, + type, + false, + null, + null, + false, + null, + "", + null, + null, + null, + false); + } + + static FieldSpec fieldWithTag( + String name, + OptionalInt tag + ) { + return new FieldSpec(name, + "0+", + null, + "int8", + false, + null, + null, + false, + null, + null, + tag.isPresent() ? "0+" : "", + null, + tag.isPresent() ? tag.getAsInt() : null, + false); + } + + static FieldSpec fieldWithTag( + String name, + int tag, + String validVersions, + String taggedVersions + ) { + return new FieldSpec(name, + validVersions, + null, + "int8", + false, + null, + null, + false, + null, + null, + taggedVersions, + null, + tag, + false); + } + + static FieldSpec fieldWithNulls( + String name, + String versions, + String type, + String nullableVersions + ) { + return new FieldSpec(name, + versions, + null, + type, + false, + nullableVersions, + null, + false, + null, + "", + null, + null, + null, + false); + } + + static FieldSpec fieldWithDefaults( + String name, + String versions, + String fieldDefault, + String flexibleVersions + ) { + return new FieldSpec(name, + versions, + null, + "string", + false, + null, + fieldDefault, + false, + null, + "", + null, + flexibleVersions, + null, + false); + } + + static String messageSpecStringToTempFile(String input) throws IOException { + File file = Files.createTempFile("MetadataSchemaCheckerToolTest", null).toFile(); + file.deleteOnExit(); + MessageSpec messageSpec = MessageGenerator.JSON_SERDE. + readValue(input.replaceAll("'", "\""), MessageSpec.class); + MessageGenerator.JSON_SERDE.writeValue(file, messageSpec); + return file.getAbsolutePath(); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/CheckerUtilsTest.java b/generator/src/test/java/org/apache/kafka/message/checker/CheckerUtilsTest.java new file mode 100644 index 00000000000..8e79070bb52 --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/CheckerUtilsTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.Versions; + +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.message.checker.CheckerTestUtils.field; +import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithTag; +import static org.apache.kafka.message.checker.CheckerTestUtils.messageSpecStringToTempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CheckerUtilsTest { + @Test + public void testMin0and1() { + assertEquals((short) 0, CheckerUtils.min((short) 0, (short) 1)); + } + + @Test + public void testMin1and0() { + assertEquals((short) 0, CheckerUtils.min((short) 1, (short) 0)); + } + + @Test + public void testMin5and5() { + assertEquals((short) 5, CheckerUtils.min((short) 5, (short) 5)); + } + + @Test + public void testMax0and1() { + assertEquals((short) 1, CheckerUtils.max((short) 0, (short) 1)); + } + + @Test + public void testMax1and0() { + assertEquals((short) 1, CheckerUtils.max((short) 1, (short) 0)); + } + + @Test + public void testMax5and5() { + assertEquals((short) 5, CheckerUtils.max((short) 5, (short) 5)); + } + + @Test + public void testValidateTaggedVersionsOnNontaggedField() { + CheckerUtils.validateTaggedVersions("field1", + field("foo", "0+", "int64"), + Versions.parse("5+", Versions.NONE)); + } + + @Test + public void testValidateTaggedVersionsOnTaggedField() { + CheckerUtils.validateTaggedVersions("field1", + fieldWithTag("foo", 123, "1+", "1+"), + Versions.parse("1+", Versions.NONE)); + } + + @Test + public void testValidateTaggedVersionsOnTaggedFieldWithError() { + assertThrows(RuntimeException.class, + () -> CheckerUtils.validateTaggedVersions("field1", + fieldWithTag("foo", 123, "1+", "1+"), + Versions.parse("2+", Versions.NONE))); + } + + @Test + public void testReadMessageSpecFromFile() throws Exception { + CheckerUtils.readMessageSpecFromFile(messageSpecStringToTempFile( + "{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}")); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/EvolutionVerifierTest.java b/generator/src/test/java/org/apache/kafka/message/checker/EvolutionVerifierTest.java new file mode 100644 index 00000000000..3e1ecf6e06b --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/EvolutionVerifierTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.MessageGenerator; +import org.apache.kafka.message.MessageSpec; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; + +import static org.apache.kafka.message.checker.CheckerTestUtils.toMessage; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Timeout(40) +public class EvolutionVerifierTest { + @Test + public void testTopLevelMessageApiKeysDoNotMatch() throws Exception { + assertEquals("Initial apiKey Optional[62] does not match final apiKey Optional[63]", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyTopLevelMessages( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"), + toMessage("{'apiKey':63, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))). + getMessage()); + } + + @Test + public void testTopLevelMessageTypesDoNotMatch() throws Exception { + assertEquals("Initial type REQUEST does not match final type RESPONSE", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyTopLevelMessages( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"), + toMessage("{'apiKey':62, 'type': 'response', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))). + getMessage()); + } + + @Test + public void testFlexibleVersionsIsNotASubset() throws Exception { + assertEquals("Initial flexibleVersions 0+ must be a subset of final flexibleVersions 1+", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyTopLevelMessages( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '1+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))). + getMessage()); + } + + @Test + public void testMaximumVersionOfInitialMessageIsHigher() throws Exception { + assertEquals("Initial maximum valid version 2 must not be higher than final maximum valid version 1", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyTopLevelMessages( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-1', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))). + getMessage()); + } + + @Test + public void testMinimumVersionOfInitialMessageIsHigher() throws Exception { + assertEquals("Initial minimum valid version 1 must not be higher than final minimum valid version 0", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyTopLevelMessages( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '1-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"))). + getMessage()); + } + + @Test + public void testIncompatibleFieldTypeChange() throws Exception { + assertEquals("Field type for field2 UserId is int32, but field type for field1 UserId is int64", + assertThrows(UnificationException.class, + () -> new EvolutionVerifier( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '1-2', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," + + "{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," + + "{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" + + "]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '1-2', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," + + "{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," + + "{'name': 'UserId', 'type': 'int32', 'versions': '2+'}" + + "]}")). + verify()). + getMessage()); + } + + @Test + public void testNewFieldAddition() throws Exception { + new EvolutionVerifier( + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '1-2', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," + + "{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," + + "{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" + + "]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '1-3', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," + + "{'name': 'ControllerId', 'type': 'int32', 'versions': '1+'}," + + "{'name': 'NewId', 'type': 'int64', 'versions': '3+'}," + + "{'name': 'UserId', 'type': 'int64', 'versions': '2+'}" + + "]}")). + verify(); + } + + + @Test + public void testFieldVersionsMustBeInsideTopLevelVersion() { + assertEquals("Field field2 in message1 has versions 1+, but the message versions are only 0.", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1", + MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList( + "{", + " \"type\": \"request\",", + " \"name\": \"LeaderAndIsrRequest\",", + " \"validVersions\": \"0\",", + " \"flexibleVersions\": \"0+\",", + " \"fields\": [", + " { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },", + " { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"1+\" }", + " ]", + "}")), MessageSpec.class))).getMessage()); + } + + @Test + public void testFieldNullableVersionsMustBeInsideTopLevelVersion() { + assertEquals("Field field1 in message1 has nullableVersions 1+, but the message versions are only 0.", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1", + MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList( + "{", + " \"type\": \"request\",", + " \"name\": \"LeaderAndIsrRequest\",", + " \"validVersions\": \"0\",", + " \"flexibleVersions\": \"0+\",", + " \"fields\": [", + " { \"name\": \"field1\", \"type\": \"string\", \"versions\": \"0+\", \"nullableVersions\": \"1+\"},", + " { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\" }", + " ]", + "}")), MessageSpec.class))).getMessage()); + } + + @Test + public void testFieldTaggedVersionsMustBeInsideTopLevelVersion() { + assertEquals("Field field1 in message1 has taggedVersions 1+, but the message versions are only 0.", + assertThrows(EvolutionException.class, + () -> EvolutionVerifier.verifyVersionsMatchTopLevelMessage("message1", + MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList( + "{", + " \"type\": \"request\",", + " \"name\": \"LeaderAndIsrRequest\",", + " \"validVersions\": \"0\",", + " \"flexibleVersions\": \"0+\",", + " \"fields\": [", + " { \"name\": \"field1\", \"type\": \"string\", \"versions\": \"0+\", \"taggedVersions\": \"1+\", \"tag\": 0},", + " { \"name\": \"field2\", \"type\": \"[]int64\", \"versions\": \"0+\" }", + " ]", + "}")), MessageSpec.class))).getMessage()); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/FieldDomainTest.java b/generator/src/test/java/org/apache/kafka/message/checker/FieldDomainTest.java new file mode 100644 index 00000000000..5d715c9883a --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/FieldDomainTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.Versions; + +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.message.checker.CheckerTestUtils.field; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FieldDomainTest { + @Test + public void testMessage1Only() { + assertEquals(FieldDomain.MESSAGE1_ONLY, + FieldDomain.of(field("bar", "1-1", "string"), + new Versions((short) 0, (short) 1), + new Versions((short) 2, (short) 5))); + } + + @Test + public void testBoth() { + assertEquals(FieldDomain.BOTH, + FieldDomain.of(field("bar", "1+", "string"), + new Versions((short) 0, (short) 1), + new Versions((short) 0, (short) 3))); + } + + @Test + public void testMessage2Only() { + assertEquals(FieldDomain.MESSAGE2_ONLY, + FieldDomain.of(field("bar", "1+", "string"), + new Versions((short) 0, (short) 0), + new Versions((short) 0, (short) 1))); + } + + @Test + public void testNeither() { + assertEquals(FieldDomain.NEITHER, + FieldDomain.of(field("bar", "2+", "string"), + new Versions((short) 0, (short) 0), + new Versions((short) 0, (short) 1))); + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/MetadataSchemaCheckerToolTest.java b/generator/src/test/java/org/apache/kafka/message/checker/MetadataSchemaCheckerToolTest.java new file mode 100644 index 00000000000..6cedfdbc112 --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/MetadataSchemaCheckerToolTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.apache.kafka.message.checker.CheckerTestUtils.messageSpecStringToTempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MetadataSchemaCheckerToolTest { + @Test + public void testSuccessfulParse() throws Exception { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + String path = messageSpecStringToTempFile( + "{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"); + MetadataSchemaCheckerTool.run(new String[] {"parse", "--path", path}, new PrintStream(stream)); + assertEquals("Successfully parsed file as MessageSpec: " + path, stream.toString().trim()); + } + } + + @Test + public void testSuccessfulVerifyEvolution() throws Exception { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + String path = messageSpecStringToTempFile( + "{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-2', 'flexibleVersions': '0+', " + + "'fields': [{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}]}"); + MetadataSchemaCheckerTool.run(new String[] {"verify-evolution", + "--path1", path, "--path2", path}, new PrintStream(stream)); + assertEquals("Successfully verified evolution of path1: " + path + ", and path2: " + path, + stream.toString().trim()); + } + } +} diff --git a/generator/src/test/java/org/apache/kafka/message/checker/UnifierTest.java b/generator/src/test/java/org/apache/kafka/message/checker/UnifierTest.java new file mode 100644 index 00000000000..cc82835a807 --- /dev/null +++ b/generator/src/test/java/org/apache/kafka/message/checker/UnifierTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message.checker; + +import org.apache.kafka.message.MessageSpec; +import org.apache.kafka.message.MessageSpecType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Collections; + +import static org.apache.kafka.message.checker.CheckerTestUtils.field; +import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithDefaults; +import static org.apache.kafka.message.checker.CheckerTestUtils.fieldWithNulls; +import static org.apache.kafka.message.checker.CheckerTestUtils.toMessage; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Timeout(40) +public class UnifierTest { + @Test + public void testAddNewField() throws Exception { + new Unifier(toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}" + + "]}"), + toMessage("{'apiKey':62, 'type': 'request', 'name': 'BrokerRegistrationRequest', " + + "'validVersions': '0-1', 'flexibleVersions': '0+', " + + "'fields': [" + + "{'name': 'BrokerId', 'type': 'int32', 'versions': '0+'}," + + "{'name': 'ControlerId', 'type': 'int32', 'versions': '1+'}" + + "]}")).unify(); + } + + static final MessageSpec TOP_LEVEL_MESSAGE_1 = new MessageSpec("TopLevelMessage", + "0-2", null, null, null, MessageSpecType.DATA, Collections.emptyList(), + "0+", Collections.emptyList(), false); + + static final MessageSpec TOP_LEVEL_MESSAGE_2 = new MessageSpec("TopLevelMessage", + "0-4", null, null, null, MessageSpecType.DATA, Collections.emptyList(), + "0+", Collections.emptyList(), false); + + static final MessageSpec TOP_LEVEL_MESSAGE_2_DROPPING_V0 = new MessageSpec("TopLevelMessage", + "1-4", null, null, null, MessageSpecType.DATA, Collections.emptyList(), + "0+", Collections.emptyList(), false); + + @Test + public void testFieldTypesDoNotMatch() throws Exception { + assertEquals("Field type for field2 foo is int8, but field type for field1 foo is int16", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0+", "int16"), + field("foo", "0+", "int8"))).getMessage()); + } + + @Test + public void testFieldTypesMatch() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0+", "int16"), + field("foo", "0+", "int16")); + } + + @Test + public void testArrayElementTypesDoNotMatch() throws Exception { + assertEquals("Field type for field2 foo is []int8, but field type for field1 foo is []int16", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0+", "[]int16"), + field("foo", "0+", "[]int8"))).getMessage()); + } + + @Test + public void testArrayFieldTypesMatch() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0+", "[]int16"), + field("foo", "0+", "[]int16")); + } + + @Test + public void testMaximumValidVersionForField2IsLowerThanField1() throws Exception { + assertEquals("Maximum effective valid version for field2 foo, '1' cannot be lower than the " + + "maximum effective valid version for field1 foo, '2'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0-2", "int64"), + field("foo", "0-1", "int64"))).getMessage()); + } + + @Test + public void testMaximumValidVersionForIsReasonable1() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0-1", "int64"), + field("foo", "0-2", "int64")); + } + + @Test + public void testMaximumValidVersionForIsReasonable2() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0+", "int64"), // effective max is 2, because TOP_LEVEL_MESSAGE_1 supports 0-2 + field("foo", "0-3", "int64")); + } + + @Test + public void testMinimumValidVersionForField2IsLowerThanField1() throws Exception { + assertEquals("Minimum effective valid version for field2 foo, '0' cannot be different than the " + + "minimum effective valid version for field1 foo, '1'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "1+", "int64"), + field("foo", "0+", "int64"))).getMessage()); + } + + @Test + public void testMinimumValidVersionForField2IsLowerThanField1Again() throws Exception { + assertEquals("Minimum effective valid version for field2 foo, '0' cannot be different than the " + + "minimum effective valid version for field1 foo, '1'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "1-2", "int64"), + field("foo", "0-2", "int64"))).getMessage()); + } + + @Test + public void testMinimumValidVersionForField2IsHigherThanField1() throws Exception { + assertEquals("Minimum effective valid version for field2 foo, '1' cannot be different than the " + + "minimum effective valid version for field1 foo, '0'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + field("foo", "0-2", "int64"), + field("foo", "1-2", "int64"))).getMessage()); + } + + @Test + public void testNullableVersionsCheckPasses1() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2_DROPPING_V0).unify( + fieldWithNulls("foo", "0-2", "string", "0+"), + fieldWithNulls("foo", "1-2", "string", "1-2")); + } + + @Test + public void testNullableVersionsCheckPasses2() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithNulls("foo", "0+", "string", "0+"), // effectively 0-2 because of max valid version + fieldWithNulls("foo", "0+", "string", "0-2")); + } + + @Test + public void testNullableVersionsCheckFails1() throws Exception { + assertEquals("Minimum effective nullable version for field2 foo, '1' cannot be different than " + + "the minimum effective nullable version for field1 foo, '0'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithNulls("foo", "0-2", "string", "0-2"), + fieldWithNulls("foo", "0-2", "string", "1-2"))).getMessage()); + } + + @Test + public void testNullableVersionsCheckFails2() throws Exception { + assertEquals("Minimum effective nullable version for field2 foo, '1' cannot be different than " + + "the minimum effective nullable version for field1 foo, '0'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithNulls("foo", "0+", "string", "0+"), + fieldWithNulls("foo", "0+", "string", "1-2"))).getMessage()); + } + + @Test + public void testFlexibleVersionsChangedCausesFailure1() throws Exception { + assertEquals("Flexible versions for field2 foo is 2+, but flexible versions for field1 is 1+", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithDefaults("foo", "0+", null, "1+"), + fieldWithDefaults("foo", "0+", null, "2+"))).getMessage()); + } + + @Test + public void testFlexibleVersionsChangedCausesFailure2() throws Exception { + assertEquals("Flexible versions for field2 foo is 2+, but flexible versions for field1 is none", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithDefaults("foo", "0+", null, ""), + fieldWithDefaults("foo", "0+", null, "2+"))).getMessage()); + } + + @Test + public void testFlexibleVersionsCheckPasses() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithDefaults("foo", "0+", null, "1+"), + fieldWithDefaults("foo", "0+", null, "1+")); + } + + @Test + public void testDefaultsChangedCausesFailure() throws Exception { + assertEquals("Default for field2 foo is 'newDefault', but default for field1 foo is 'oldDefault'", + assertThrows(UnificationException.class, + () -> new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithDefaults("foo", "0+", "oldDefault", null), + fieldWithDefaults("foo", "0+", "newDefault", null))).getMessage()); + } + + @Test + public void testDefaultsCheckPasses() throws Exception { + new Unifier(TOP_LEVEL_MESSAGE_1, TOP_LEVEL_MESSAGE_2).unify( + fieldWithDefaults("foo", "0+", "oldDefault", null), + fieldWithDefaults("foo", "0+", "oldDefault", null)); + } +}