KAFKA-4173; SchemaProjector should successfully project missing Struct field when target field is optional

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #1865 from shikhar/kafka-4173
This commit is contained in:
Shikhar Bhushan 2016-09-16 15:54:33 -07:00 committed by Jason Gustafson
parent 567cc3d787
commit d7bffebca0
2 changed files with 26 additions and 7 deletions

View File

@ -111,14 +111,12 @@ public class SchemaProjector {
} catch (SchemaProjectorException e) {
throw new SchemaProjectorException("Error projecting " + sourceField.name(), e);
}
} else if (targetField.schema().isOptional()) {
// Ignore missing field
} else if (targetField.schema().defaultValue() != null) {
targetStruct.put(fieldName, targetField.schema().defaultValue());
} else {
Object targetDefault;
if (targetField.schema().defaultValue() != null) {
targetDefault = targetField.schema().defaultValue();
} else {
throw new SchemaProjectorException("Cannot project " + source.schema() + " to " + target.schema());
}
targetStruct.put(fieldName, targetDefault);
throw new SchemaProjectorException("Required field `" + fieldName + "` is missing from source schema: " + source);
}
}
return targetStruct;

View File

@ -469,6 +469,27 @@ public class SchemaProjectorTest {
}
}
@Test
public void testProjectMissingDefaultValuedStructField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.int64().defaultValue(42L).build()).build();
assertEquals(42L, (long) ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id"));
}
@Test
public void testProjectMissingOptionalStructField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.OPTIONAL_INT64_SCHEMA).build();
assertEquals(null, ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id"));
}
@Test(expected = SchemaProjectorException.class)
public void testProjectMissingRequiredField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA).build();
SchemaProjector.project(source, new Struct(source), target);
}
private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) {
Schema target;
assert source.isOptional();