KAFKA-16913: Support external schemas in JSONConverter (#19449)

When using a connector that requires a schema, such as JDBC connectors,
with JSON messages, the current JSONConverter necessitates including the
schema within every message. To address this, we are introducing a new
parameter, schema.content, which allows you to provide the schema
externally. This approach not only reduces the size of the messages but
also facilitates the use of more complex schemas.

KIP :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter

Reviewers: Mickael Maison <mickael.maison@gmail.com>, TengYao Chi <frankvicky@apache.org>, Edoardo Comar <ECOMAR@uk.ibm.com>
This commit is contained in:
Priyanka K U 2025-08-05 13:30:14 +05:30 committed by GitHub
parent e78977e505
commit eb9f5189f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 98 additions and 6 deletions

View File

@ -224,6 +224,7 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {
private JsonConverterConfig config; private JsonConverterConfig config;
private Cache<Schema, ObjectNode> fromConnectSchemaCache; private Cache<Schema, ObjectNode> fromConnectSchemaCache;
private Cache<JsonNode, Schema> toConnectSchemaCache; private Cache<JsonNode, Schema> toConnectSchemaCache;
private Schema schema = null; // if a schema is provided in config, this schema will be used for all messages
private final JsonSerializer serializer; private final JsonSerializer serializer;
private final JsonDeserializer deserializer; private final JsonDeserializer deserializer;
@ -286,6 +287,16 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {
fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));
try {
final byte[] schemaContent = config.schemaContent();
if (schemaContent != null) {
final JsonNode schemaNode = deserializer.deserialize("", schemaContent);
this.schema = asConnectSchema(schemaNode);
}
} catch (SerializationException e) {
throw new DataException("Failed to parse schema in converter config due to serialization error: ", e);
}
} }
@Override @Override
@ -340,13 +351,16 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e); throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
} }
if (config.schemasEnabled() && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))) if (config.schemasEnabled()) {
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + if (schema != null) {
return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config));
} else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
" If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."); " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
}
// The deserialized data should either be an envelope object containing the schema and the payload or the schema } else {
// was stripped during serialization and we need to fill in an all-encompassing schema. // The deserialized data should either be an envelope object containing the schema and the payload or the schema
if (!config.schemasEnabled()) { // was stripped during serialization and we need to fill in an all-encompassing schema.
ObjectNode envelope = JSON_NODE_FACTORY.objectNode(); ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null); envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue); envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterConfig;
import java.nio.charset.StandardCharsets;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -35,6 +36,11 @@ public final class JsonConverterConfig extends ConverterConfig {
private static final String SCHEMAS_ENABLE_DOC = "Include schemas within each of the serialized values and keys."; private static final String SCHEMAS_ENABLE_DOC = "Include schemas within each of the serialized values and keys.";
private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas"; private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";
public static final String SCHEMA_CONTENT_CONFIG = "schema.content";
public static final String SCHEMA_CONTENT_DEFAULT = null;
private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will be included in the content of each message.";
private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content";
public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size"; public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000; public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance."; private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance.";
@ -61,6 +67,8 @@ public final class JsonConverterConfig extends ConverterConfig {
orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY); orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group, CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY); orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY);
CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY);
group = "Serialization"; group = "Serialization";
orderInGroup = 0; orderInGroup = 0;
@ -86,6 +94,7 @@ public final class JsonConverterConfig extends ConverterConfig {
private final int schemaCacheSize; private final int schemaCacheSize;
private final DecimalFormat decimalFormat; private final DecimalFormat decimalFormat;
private final boolean replaceNullWithDefault; private final boolean replaceNullWithDefault;
private final byte[] schemaContent;
public JsonConverterConfig(Map<String, ?> props) { public JsonConverterConfig(Map<String, ?> props) {
super(CONFIG, props); super(CONFIG, props);
@ -93,6 +102,10 @@ public final class JsonConverterConfig extends ConverterConfig {
this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG); this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG);
this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT)); this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT));
this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG); this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG);
this.schemaContent = (schemaContentStr == null || schemaContentStr.isEmpty())
? null
: schemaContentStr.getBytes(StandardCharsets.UTF_8);
} }
/** /**
@ -130,4 +143,15 @@ public final class JsonConverterConfig extends ConverterConfig {
return replaceNullWithDefault; return replaceNullWithDefault;
} }
/**
* If a default schema is provided in the converter config, this will be
* used for all messages.
*
* This is only relevant if schemas are enabled.
*
* @return Schema Contents, will return null if no value is provided
*/
public byte[] schemaContent() {
return schemaContent;
}
} }

View File

@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -977,6 +979,58 @@ public class JsonConverterTest {
assertEquals(AppInfoParser.getVersion(), converter.version()); assertEquals(AppInfoParser.getVersion(), converter.version());
} }
@Test
public void testSchemaContentIsNull() {
Map<String, Object> config = new HashMap<>();
config.put(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null);
converter.configure(config, false);
byte[] jsonBytes = "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes();
SchemaAndValue result = converter.toConnectData(TOPIC, jsonBytes);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), result);
}
@Test
public void testSchemaContentIsEmptyString() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, ""), false);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
}
@Test
public void testSchemaContentValidSchema() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"string\" }"), false);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes()));
}
@Test
public void testSchemaContentInValidSchema() {
assertThrows(
DataException.class,
() -> converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"string\" }"), false),
" Provided schema is invalid , please recheck the schema you have provided");
}
@Test
public void testSchemaContentLooksLikeSchema() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": \"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, {\"field\": \"payload\", \"type\": \"string\"}]}"), false);
SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes());
assertEquals("foo-bar-baz", ((Struct) connectData.value()).getString("payload"));
}
@ParameterizedTest
@ValueSource(strings = {
"{ }",
"{ \"wrong\": \"schema\" }",
"{ \"schema\": { \"type\": \"string\" } }",
"{ \"payload\": \"foo-bar-baz\" }",
"{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", \"extra\": \"field\" }",
})
public void testNullSchemaContentWithWrongConnectDataValue(String value) {
converter.configure(Map.of(), false);
assertThrows(
DataException.class,
() -> converter.toConnectData(TOPIC, value.getBytes()));
}
private JsonNode parse(byte[] json) { private JsonNode parse(byte[] json) {
try { try {
return objectMapper.readTree(json); return objectMapper.readTree(json);