diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ad0caf85f83..6277e44c4e0 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -111,14 +111,12 @@ public class SchemaProjector { } catch (SchemaProjectorException e) { throw new SchemaProjectorException("Error projecting " + sourceField.name(), e); } + } else if (targetField.schema().isOptional()) { + // Ignore missing field + } else if (targetField.schema().defaultValue() != null) { + targetStruct.put(fieldName, targetField.schema().defaultValue()); } else { - Object targetDefault; - if (targetField.schema().defaultValue() != null) { - targetDefault = targetField.schema().defaultValue(); - } else { - throw new SchemaProjectorException("Cannot project " + source.schema() + " to " + target.schema()); - } - targetStruct.put(fieldName, targetDefault); + throw new SchemaProjectorException("Required field `" + fieldName + "` is missing from source schema: " + source); } } return targetStruct; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 0b1760be7a2..101be043226 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -469,6 +469,27 @@ public class SchemaProjectorTest { } } + @Test + public void testProjectMissingDefaultValuedStructField() { + final Schema source = SchemaBuilder.struct().build(); + final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.int64().defaultValue(42L).build()).build(); + assertEquals(42L, (long) ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id")); + } + + @Test + public void testProjectMissingOptionalStructField() { + final Schema source = SchemaBuilder.struct().build(); + final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.OPTIONAL_INT64_SCHEMA).build(); + assertEquals(null, ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id")); + } + + @Test(expected = SchemaProjectorException.class) + public void testProjectMissingRequiredField() { + final Schema source = SchemaBuilder.struct().build(); + final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA).build(); + SchemaProjector.project(source, new Struct(source), target); + } + private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) { Schema target; assert source.isOptional();