mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-8523 Enabling InsertField transform to be used with tombstone events (#6914)
* KAFKA-8523 Avoiding raw type usage * KAFKA-8523 Gracefully handling tombstone events in InsertField SMT
This commit is contained in:
		
							parent
							
								
									c550eba0a2
								
							
						
					
					
						commit
						c2af356724
					
				| 
						 | 
					@ -127,13 +127,19 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public R apply(R record) {
 | 
					    public R apply(R record) {
 | 
				
			||||||
        if (operatingSchema(record) == null) {
 | 
					        if (isTombstoneRecord(record)) {
 | 
				
			||||||
 | 
					            return record;
 | 
				
			||||||
 | 
					        } else if (operatingSchema(record) == null) {
 | 
				
			||||||
            return applySchemaless(record);
 | 
					            return applySchemaless(record);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            return applyWithSchema(record);
 | 
					            return applyWithSchema(record);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private boolean isTombstoneRecord(R record) {
 | 
				
			||||||
 | 
					        return record.value() == null;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private R applySchemaless(R record) {
 | 
					    private R applySchemaless(R record) {
 | 
				
			||||||
        final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
 | 
					        final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -104,11 +104,53 @@ public class InsertFieldTest {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        final SourceRecord transformedRecord = xform.apply(record);
 | 
					        final SourceRecord transformedRecord = xform.apply(record);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assertEquals(42L, ((Map) transformedRecord.value()).get("magic"));
 | 
					        assertEquals(42L, ((Map<?, ?>) transformedRecord.value()).get("magic"));
 | 
				
			||||||
        assertEquals("test", ((Map) transformedRecord.value()).get("topic_field"));
 | 
					        assertEquals("test", ((Map<?, ?>) transformedRecord.value()).get("topic_field"));
 | 
				
			||||||
        assertEquals(0, ((Map) transformedRecord.value()).get("partition_field"));
 | 
					        assertEquals(0, ((Map<?, ?>) transformedRecord.value()).get("partition_field"));
 | 
				
			||||||
        assertEquals(null, ((Map) transformedRecord.value()).get("timestamp_field"));
 | 
					        assertEquals(null, ((Map<?, ?>) transformedRecord.value()).get("timestamp_field"));
 | 
				
			||||||
        assertEquals("my-instance-id", ((Map) transformedRecord.value()).get("instance_id"));
 | 
					        assertEquals("my-instance-id", ((Map<?, ?>) transformedRecord.value()).get("instance_id"));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void insertConfiguredFieldsIntoTombstoneEventWithoutSchemaLeavesValueUnchanged() {
 | 
				
			||||||
 | 
					        final Map<String, Object> props = new HashMap<>();
 | 
				
			||||||
 | 
					        props.put("topic.field", "topic_field!");
 | 
				
			||||||
 | 
					        props.put("partition.field", "partition_field");
 | 
				
			||||||
 | 
					        props.put("timestamp.field", "timestamp_field?");
 | 
				
			||||||
 | 
					        props.put("static.field", "instance_id");
 | 
				
			||||||
 | 
					        props.put("static.value", "my-instance-id");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        xform.configure(props);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final SourceRecord record = new SourceRecord(null, null, "test", 0,
 | 
				
			||||||
 | 
					                null, null);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final SourceRecord transformedRecord = xform.apply(record);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        assertEquals(null, transformedRecord.value());
 | 
				
			||||||
 | 
					        assertEquals(null, transformedRecord.valueSchema());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void insertConfiguredFieldsIntoTombstoneEventWithSchemaLeavesValueUnchanged() {
 | 
				
			||||||
 | 
					        final Map<String, Object> props = new HashMap<>();
 | 
				
			||||||
 | 
					        props.put("topic.field", "topic_field!");
 | 
				
			||||||
 | 
					        props.put("partition.field", "partition_field");
 | 
				
			||||||
 | 
					        props.put("timestamp.field", "timestamp_field?");
 | 
				
			||||||
 | 
					        props.put("static.field", "instance_id");
 | 
				
			||||||
 | 
					        props.put("static.value", "my-instance-id");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        xform.configure(props);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final SourceRecord record = new SourceRecord(null, null, "test", 0,
 | 
				
			||||||
 | 
					                simpleStructSchema, null);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        final SourceRecord transformedRecord = xform.apply(record);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        assertEquals(null, transformedRecord.value());
 | 
				
			||||||
 | 
					        assertEquals(simpleStructSchema, transformedRecord.valueSchema());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue