KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message (#9541)

The following error message
`org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "moderate_time"`
can be confusing because java.lang.Long is acceptable type for schema INT64.

In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but this info is not logged.

Reviewers: Randall Hauch <rhauch@gmail.com>, Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit is contained in:
Alexander Iskuskov 2021-07-10 08:35:02 +03:00 committed by GitHub
parent d24094bb0a
commit efe6029f9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 13 deletions

View File

@ -222,12 +222,6 @@ public class ConnectSchema implements Schema {
}
List<Class<?>> expectedClasses = expectedClassesFor(schema);
if (expectedClasses == null)
throw new DataException("Invalid Java object for schema type " + schema.type()
+ ": " + value.getClass()
+ " for field: \"" + name + "\"");
boolean foundMatch = false;
for (Class<?> expectedClass : expectedClasses) {
if (expectedClass.isInstance(value)) {
@ -236,10 +230,17 @@ public class ConnectSchema implements Schema {
}
}
if (!foundMatch)
throw new DataException("Invalid Java object for schema type " + schema.type()
+ ": " + value.getClass()
+ " for field: \"" + name + "\"");
if (!foundMatch) {
StringBuilder exceptionMessage = new StringBuilder("Invalid Java object for schema");
if (schema.name() != null) {
exceptionMessage.append(" \"").append(schema.name()).append("\"");
}
exceptionMessage.append(" with type ").append(schema.type()).append(": ").append(value.getClass());
if (name != null) {
exceptionMessage.append(" for field: \"").append(name).append("\"");
}
throw new DataException(exceptionMessage.toString());
}
switch (schema.type()) {
case STRUCT:
@ -266,7 +267,7 @@ public class ConnectSchema implements Schema {
private static List<Class<?>> expectedClassesFor(Schema schema) {
List<Class<?>> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), Collections.emptyList());
return expectedClasses;
}

View File

@ -311,13 +311,30 @@ public class StructTest {
Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
fakeSchema, new Object()));
assertEquals("Invalid Java object for schema type null: class java.lang.Object for field: \"field\"",
assertEquals("Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"",
e.getMessage());
e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
Schema.INT8_SCHEMA, new Object()));
assertEquals("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\"",
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"",
e.getMessage());
e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object", e.getMessage());
}
@Test
public void testValidateFieldWithInvalidValueMismatchTimestamp() {
String fieldName = "field";
long longValue = 1000L;
// Does not throw
ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
Timestamp.SCHEMA, longValue));
assertEquals("Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " +
"with type INT64: class java.lang.Long for field: \"field\"", e.getMessage());
}
@Test