mirror of https://github.com/apache/kafka.git
KAFKA-15012: Allow leading zeros in numeric fields while deserializing JSON messages using the JsonConverter (#13800)
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
89581a738f
commit
02fb4b882b
|
@ -16,15 +16,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.connect.json;
|
package org.apache.kafka.connect.json;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.json.JsonReadFeature;
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Set;
|
|
||||||
import org.apache.kafka.common.errors.SerializationException;
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JSON deserializer for Jackson's {@link JsonNode} tree model. Using the tree model allows it to work with arbitrarily
|
* JSON deserializer for Jackson's {@link JsonNode} tree model. Using the tree model allows it to work with arbitrarily
|
||||||
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
|
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
|
||||||
|
@ -50,6 +52,7 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
|
||||||
final Set<DeserializationFeature> deserializationFeatures,
|
final Set<DeserializationFeature> deserializationFeatures,
|
||||||
final JsonNodeFactory jsonNodeFactory
|
final JsonNodeFactory jsonNodeFactory
|
||||||
) {
|
) {
|
||||||
|
objectMapper.enable(JsonReadFeature.ALLOW_LEADING_ZEROS_FOR_NUMBERS.mappedFeature());
|
||||||
deserializationFeatures.forEach(objectMapper::enable);
|
deserializationFeatures.forEach(objectMapper::enable);
|
||||||
objectMapper.setNodeFactory(jsonNodeFactory);
|
objectMapper.setNodeFactory(jsonNodeFactory);
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,6 +115,14 @@ public class JsonConverterTest {
|
||||||
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
|
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void numberWithLeadingZerosToConnect() {
|
||||||
|
assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 0012 }".getBytes()));
|
||||||
|
assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 123), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 000123 }".getBytes()));
|
||||||
|
assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12345), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 000012345 }".getBytes()));
|
||||||
|
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 123456789L), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 00000123456789 }".getBytes()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void floatToConnect() {
|
public void floatToConnect() {
|
||||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
|
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
|
||||||
|
|
Loading…
Reference in New Issue