KAFKA-5218; New Short serializer, deserializer, serde

Author: Mario Molina <mmolimar@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>, Michael G. Noll <michael@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3017 from mmolimar/KAFKA-5218
This commit is contained in:
Mario Molina 2017-05-31 15:09:58 -07:00 committed by Guozhang Wang
parent d9479275fd
commit dc5bf4bd45
7 changed files with 175 additions and 184 deletions

View File

@ -52,7 +52,7 @@
files="AbstractRequest.java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/>
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"

View File

@ -30,8 +30,7 @@ public class IntegerDeserializer implements Deserializer<Integer> {
if (data == null)
return null;
if (data.length != 4) {
throw new SerializationException("Size of data received by IntegerDeserializer is " +
"not 4");
throw new SerializationException("Size of data received by IntegerDeserializer is not 4");
}
int value = 0;

View File

@ -30,8 +30,7 @@ public class LongDeserializer implements Deserializer<Long> {
if (data == null)
return null;
if (data.length != 8) {
throw new SerializationException("Size of data received by LongDeserializer is " +
"not 8");
throw new SerializationException("Size of data received by LongDeserializer is not 8");
}
long value = 0;

View File

@ -70,6 +70,12 @@ public class Serdes {
}
}
static public final class ShortSerde extends WrapperSerde<Short> {
public ShortSerde() {
super(new ShortSerializer(), new ShortDeserializer());
}
}
static public final class FloatSerde extends WrapperSerde<Float> {
public FloatSerde() {
super(new FloatSerializer(), new FloatDeserializer());
@ -112,6 +118,10 @@ public class Serdes {
return (Serde<T>) String();
}
if (Short.class.isAssignableFrom(type)) {
return (Serde<T>) Short();
}
if (Integer.class.isAssignableFrom(type)) {
return (Serde<T>) Integer();
}
@ -175,6 +185,13 @@ public class Serdes {
return new IntegerSerde();
}
/*
* A serde for nullable {@code Short} type.
*/
static public Serde<Short> Short() {
return new ShortSerde();
}
/*
* A serde for nullable {@code Float} type.
*/

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class ShortDeserializer implements Deserializer<Short> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public Short deserialize(String topic, byte[] data) {
if (data == null)
return null;
if (data.length != 2) {
throw new SerializationException("Size of data received by ShortDeserializer is not 2");
}
short value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return value;
}
public void close() {
// nothing to do
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class ShortSerializer implements Serializer<Short> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, Short data) {
if (data == null)
return null;
return new byte[] {
(byte) (data >>> 8),
data.byteValue()
};
}
public void close() {
// nothing to do
}
}

View File

@ -17,10 +17,11 @@
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.Bytes;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -29,27 +30,50 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class SerializationTest {
final private String topic = "testTopic";
final private Map<Class<Object>, List<Object>> testData = new HashMap() {
{
put(String.class, Arrays.asList("my string"));
put(Short.class, Arrays.asList((short) 32767, (short) -32768));
put(Integer.class, Arrays.asList((int) 423412424, (int) -41243432));
put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L));
put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f));
put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d));
put(byte[].class, Arrays.asList("my string".getBytes()));
put(ByteBuffer.class, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes())));
}
};
private class DummyClass {
}
@Test
public void testSerdeFrom() {
Serde<Long> thisSerde = Serdes.serdeFrom(Long.class);
Serde<Long> otherSerde = Serdes.Long();
public void allSerdesShouldRoundtripInput() {
for (Map.Entry<Class<Object>, List<Object>> test : testData.entrySet()) {
try (Serde<Object> serde = Serdes.serdeFrom(test.getKey())) {
for (Object value : test.getValue()) {
assertEquals("Should get the original " + test.getKey().getSimpleName() +
" after serialization and deserialization", value,
serde.deserializer().deserialize(topic, serde.serializer().serialize(topic, value)));
}
}
}
}
Long value = 423412424L;
assertEquals("Should get the original long after serialization and deserialization",
value, thisSerde.deserializer().deserialize(topic, otherSerde.serializer().serialize(topic, value)));
assertEquals("Should get the original long after serialization and deserialization",
value, otherSerde.deserializer().deserialize(topic, thisSerde.serializer().serialize(topic, value)));
@Test
public void allSerdesShouldSupportNull() {
for (Class<?> cls : testData.keySet()) {
try (Serde<?> serde = Serdes.serdeFrom(cls)) {
assertThat("Should support null in " + cls.getSimpleName() + " serialization",
serde.serializer().serialize(topic, null), nullValue());
assertThat("Should support null in " + cls.getSimpleName() + " deserialization",
serde.deserializer().deserialize(topic, null), nullValue());
}
}
}
@Test(expected = IllegalArgumentException.class)
@ -59,200 +83,65 @@ public class SerializationTest {
@Test(expected = IllegalArgumentException.class)
public void testSerdeFromNotNull() {
Serdes.serdeFrom(null, Serdes.Long().deserializer());
try (Serde<Long> serde = Serdes.Long()) {
Serdes.serdeFrom(null, serde.deserializer());
}
}
@Test
public void testStringSerializer() {
public void stringSerdeShouldSupportDifferentEncodings() {
String str = "my string";
List<String> encodings = new ArrayList<String>();
encodings.add("UTF8");
encodings.add("UTF-16");
List<String> encodings = Arrays.asList("UTF8", "UTF-16");
for (String encoding : encodings) {
Serde<String> serDeser = getStringSerde(encoding);
try (Serde<String> serDeser = getStringSerde(encoding)) {
Serializer<String> serializer = serDeser.serializer();
Deserializer<String> deserializer = serDeser.deserializer();
assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding,
str, deserializer.deserialize(topic, serializer.serialize(topic, str)));
assertEquals("Should support null in serialization and deserialization with encoding " + encoding,
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
}
}
}
@Test
public void testIntegerSerializer() {
Integer[] integers = new Integer[]{
423412424,
-41243432
};
Serializer<Integer> serializer = Serdes.Integer().serializer();
Deserializer<Integer> deserializer = Serdes.Integer().deserializer();
for (Integer integer : integers) {
assertEquals("Should get the original integer after serialization and deserialization",
integer, deserializer.deserialize(topic, serializer.serialize(topic, integer)));
}
assertEquals("Should support null in serialization and deserialization",
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
serializer.close();
deserializer.close();
}
@Test
public void testLongSerializer() {
Long[] longs = new Long[]{
922337203685477580L,
-922337203685477581L
};
Serializer<Long> serializer = Serdes.Long().serializer();
Deserializer<Long> deserializer = Serdes.Long().deserializer();
for (Long value : longs) {
assertEquals("Should get the original long after serialization and deserialization",
value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
}
assertEquals("Should support null in serialization and deserialization",
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
serializer.close();
deserializer.close();
}
@Test
public void shouldSerializeDeserializeFloat() {
final Float[] floats = new Float[]{
5678567.12312f,
-5678567.12341f
};
final Serializer<Float> serializer = Serdes.Float().serializer();
final Deserializer<Float> deserializer = Serdes.Float().deserializer();
for (final Float value : floats) {
assertThat("Should round-trip a float",
value, equalTo(deserializer.deserialize(topic, serializer.serialize(topic, value))));
}
serializer.close();
deserializer.close();
}
@Test
public void floatSerializerShouldReturnNullForNull() {
final Serializer<Float> serializer = Serdes.Float().serializer();
assertThat(serializer.serialize(topic, null), nullValue());
serializer.close();
}
@Test
public void floatDeserializerShouldReturnNullForNull() {
final Deserializer<Float> deserializer = Serdes.Float().deserializer();
assertThat(deserializer.deserialize(topic, null), nullValue());
deserializer.close();
}
@Test
@Test(expected = SerializationException.class)
public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() {
final Deserializer<Float> deserializer = Serdes.Float().deserializer();
try {
deserializer.deserialize(topic, new byte[0]);
fail("Should have thrown a SerializationException because of zero input bytes");
} catch (SerializationException e) {
// Ignore (there's no contract on the details of the exception)
try (Serde<Float> serde = Serdes.Float()) {
serde.deserializer().deserialize(topic, new byte[0]);
}
deserializer.close();
}
@Test
@Test(expected = SerializationException.class)
public void floatDeserializerShouldThrowSerializationExceptionOnTooFewBytes() {
final Deserializer<Float> deserializer = Serdes.Float().deserializer();
try {
deserializer.deserialize(topic, new byte[3]);
fail("Should have thrown a SerializationException because of too few input bytes");
} catch (SerializationException e) {
// Ignore (there's no contract on the details of the exception)
try (Serde<Float> serde = Serdes.Float()) {
serde.deserializer().deserialize(topic, new byte[3]);
}
deserializer.close();
}
@Test
@Test(expected = SerializationException.class)
public void floatDeserializerShouldThrowSerializationExceptionOnTooManyBytes() {
final Deserializer<Float> deserializer = Serdes.Float().deserializer();
try {
deserializer.deserialize(topic, new byte[5]);
fail("Should have thrown a SerializationException because of too many input bytes");
} catch (SerializationException e) {
// Ignore (there's no contract on the details of the exception)
try (Serde<Float> serde = Serdes.Float()) {
serde.deserializer().deserialize(topic, new byte[5]);
}
deserializer.close();
}
@Test
public void floatSerdeShouldPreserveNaNValues() {
final int someNaNAsIntBits = 0x7f800001;
final float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
final int anotherNaNAsIntBits = 0x7f800002;
final float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
int someNaNAsIntBits = 0x7f800001;
float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
int anotherNaNAsIntBits = 0x7f800002;
float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
final Serde<Float> serde = Serdes.Float();
try (Serde<Float> serde = Serdes.Float()) {
// Because of NaN semantics we must assert based on the raw int bits.
final Float roundtrip = serde.deserializer().deserialize(topic,
Float roundtrip = serde.deserializer().deserialize(topic,
serde.serializer().serialize(topic, someNaN));
assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits));
final Float otherRoundtrip = serde.deserializer().deserialize(topic,
Float otherRoundtrip = serde.deserializer().deserialize(topic,
serde.serializer().serialize(topic, anotherNaN));
assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits));
serde.close();
}
@Test
public void testDoubleSerializer() {
Double[] doubles = new Double[]{
5678567.12312d,
-5678567.12341d
};
Serializer<Double> serializer = Serdes.Double().serializer();
Deserializer<Double> deserializer = Serdes.Double().deserializer();
for (Double value : doubles) {
assertEquals("Should get the original double after serialization and deserialization",
value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
}
assertEquals("Should support null in serialization and deserialization",
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
serializer.close();
deserializer.close();
}
@Test
public void testByteBufferSerializer() {
ByteBuffer buf = ByteBuffer.allocate(10);
buf.put("my string".getBytes());
Serializer<ByteBuffer> serializer = Serdes.ByteBuffer().serializer();
Deserializer<ByteBuffer> deserializer = Serdes.ByteBuffer().deserializer();
assertEquals("Should get the original ByteBuffer after serialization and deserialization",
buf, deserializer.deserialize(topic, serializer.serialize(topic, buf)));
assertEquals("Should support null in serialization and deserialization",
null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
serializer.close();
deserializer.close();
}
private Serde<String> getStringSerde(String encoder) {