From eb9f5189f5bf40b9f55a2550078b6a32024712cc Mon Sep 17 00:00:00 2001 From: Priyanka K U Date: Tue, 5 Aug 2025 13:30:14 +0530 Subject: [PATCH] KAFKA-16913: Support external schemas in JSONConverter (#19449) When using a connector that requires a schema, such as JDBC connectors, with JSON messages, the current JSONConverter necessitates including the schema within every message. To address this, we are introducing a new parameter, schema.content, which allows you to provide the schema externally. This approach not only reduces the size of the messages but also facilitates the use of more complex schemas. KIP : https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter Reviewers: Mickael Maison , TengYao Chi , Edoardo Comar --- .../kafka/connect/json/JsonConverter.java | 26 ++++++--- .../connect/json/JsonConverterConfig.java | 24 +++++++++ .../kafka/connect/json/JsonConverterTest.java | 54 +++++++++++++++++++ 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 8b3d60133a9..7fa5358f1c3 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -224,6 +224,7 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned { private JsonConverterConfig config; private Cache fromConnectSchemaCache; private Cache toConnectSchemaCache; + private Schema schema = null; // if a schema is provided in config, this schema will be used for all messages private final JsonSerializer serializer; private final JsonDeserializer deserializer; @@ -286,6 +287,16 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned { fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); + + try { + final byte[] schemaContent = config.schemaContent(); + if (schemaContent != null) { + final JsonNode schemaNode = deserializer.deserialize("", schemaContent); + this.schema = asConnectSchema(schemaNode); + } + } catch (SerializationException e) { + throw new DataException("Failed to parse schema in converter config due to serialization error: ", e); + } } @Override @@ -340,13 +351,16 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned { throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e); } - if (config.schemasEnabled() && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))) - throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + + if (config.schemasEnabled()) { + if (schema != null) { + return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config)); + } else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) { + throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."); - - // The deserialized data should either be an envelope object containing the schema and the payload or the schema - // was stripped during serialization and we need to fill in an all-encompassing schema. - if (!config.schemasEnabled()) { + } + } else { + // The deserialized data should either be an envelope object containing the schema and the payload or the schema + // was stripped during serialization and we need to fill in an all-encompassing schema. ObjectNode envelope = JSON_NODE_FACTORY.objectNode(); envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null); envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue); diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java index f02d54ac263..4d148250114 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.storage.ConverterConfig; +import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.Map; @@ -35,6 +36,11 @@ public final class JsonConverterConfig extends ConverterConfig { private static final String SCHEMAS_ENABLE_DOC = "Include schemas within each of the serialized values and keys."; private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas"; + public static final String SCHEMA_CONTENT_CONFIG = "schema.content"; + public static final String SCHEMA_CONTENT_DEFAULT = null; + private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will be included in the content of each message."; + private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content"; + public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size"; public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000; private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance."; @@ -61,6 +67,8 @@ public final class JsonConverterConfig extends ConverterConfig { orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY); CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group, orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY); + CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group, + orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY); group = "Serialization"; orderInGroup = 0; @@ -86,6 +94,7 @@ public final class JsonConverterConfig extends ConverterConfig { private final int schemaCacheSize; private final DecimalFormat decimalFormat; private final boolean replaceNullWithDefault; + private final byte[] schemaContent; public JsonConverterConfig(Map props) { super(CONFIG, props); @@ -93,6 +102,10 @@ public final class JsonConverterConfig extends ConverterConfig { this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG); this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT)); this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG); + String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG); + this.schemaContent = (schemaContentStr == null || schemaContentStr.isEmpty()) + ? null + : schemaContentStr.getBytes(StandardCharsets.UTF_8); } /** @@ -130,4 +143,15 @@ public final class JsonConverterConfig extends ConverterConfig { return replaceNullWithDefault; } + /** + * If a default schema is provided in the converter config, this will be + * used for all messages. + * + * This is only relevant if schemas are enabled. + * + * @return Schema Contents, will return null if no value is provided + */ + public byte[] schemaContent() { + return schemaContent; + } } diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 34010ddac05..200b33d1774 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -977,6 +979,58 @@ public class JsonConverterTest { assertEquals(AppInfoParser.getVersion(), converter.version()); } + @Test + public void testSchemaContentIsNull() { + Map config = new HashMap<>(); + config.put(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null); + converter.configure(config, false); + byte[] jsonBytes = "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes(); + SchemaAndValue result = converter.toConnectData(TOPIC, jsonBytes); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), result); + } + + @Test + public void testSchemaContentIsEmptyString() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, ""), false); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); + } + + @Test + public void testSchemaContentValidSchema() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"string\" }"), false); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes())); + } + + @Test + public void testSchemaContentInValidSchema() { + assertThrows( + DataException.class, + () -> converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"string\" }"), false), + " Provided schema is invalid , please recheck the schema you have provided"); + } + + @Test + public void testSchemaContentLooksLikeSchema() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": \"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, {\"field\": \"payload\", \"type\": \"string\"}]}"), false); + SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()); + assertEquals("foo-bar-baz", ((Struct) connectData.value()).getString("payload")); + } + + @ParameterizedTest + @ValueSource(strings = { + "{ }", + "{ \"wrong\": \"schema\" }", + "{ \"schema\": { \"type\": \"string\" } }", + "{ \"payload\": \"foo-bar-baz\" }", + "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", \"extra\": \"field\" }", + }) + public void testNullSchemaContentWithWrongConnectDataValue(String value) { + converter.configure(Map.of(), false); + assertThrows( + DataException.class, + () -> converter.toConnectData(TOPIC, value.getBytes())); + } + private JsonNode parse(byte[] json) { try { return objectMapper.readTree(json);