diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index b5b2c6aa3c4..3ae46843289 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; public class ChangedDeserializer implements Deserializer>, WrappingNullableDeserializer, Void, T> { - private static final int NEW_OLD_FLAG_SIZE = 1; + private static final int ENCODING_FLAG_SIZE = 1; private static final int IS_LATEST_FLAG_SIZE = 1; private Deserializer inner; @@ -50,64 +50,76 @@ public class ChangedDeserializer implements Deserializer>, Wrapping @Override public Change deserialize(final String topic, final Headers headers, final byte[] data) { // The format we need to deserialize is: - // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} - // {BYTE_ARRAY newValue}{BYTE newOldFlag=1} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} - // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} - // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} + // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0} + // {BYTE_ARRAY newValue}{BYTE encodingFlag=1} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2} + // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3} + // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5} final ByteBuffer buffer = ByteBuffer.wrap(data); - final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE); + final byte encodingFlag = buffer.get(data.length - ENCODING_FLAG_SIZE); final byte[] newData; final byte[] oldData; final boolean isLatest; - if (newOldFlag == (byte) 0) { - newData = null; - final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE; - oldData = new byte[oldDataLength]; - buffer.get(oldData); - isLatest = true; - } else if (newOldFlag == (byte) 1) { - oldData = null; - final int newDataLength = data.length - NEW_OLD_FLAG_SIZE; - newData = new byte[newDataLength]; - buffer.get(newData); - isLatest = true; - } else if (newOldFlag == (byte) 2) { - final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer)); - newData = new byte[newDataLength]; + switch (encodingFlag) { + case (byte) 0: { + newData = null; + final int oldDataLength = data.length - ENCODING_FLAG_SIZE; + oldData = new byte[oldDataLength]; + buffer.get(oldData); + isLatest = true; + break; + } + case (byte) 1: { + oldData = null; + final int newDataLength = data.length - ENCODING_FLAG_SIZE; + newData = new byte[newDataLength]; + buffer.get(newData); + isLatest = true; + break; + } + case (byte) 2: { + final int newDataLength = ByteUtils.readVarint(buffer); + newData = new byte[newDataLength]; + buffer.get(newData); - final int oldDataLength = data.length - Integer.BYTES - newDataLength - NEW_OLD_FLAG_SIZE; - oldData = new byte[oldDataLength]; + final int oldDataLength = buffer.capacity() - buffer.position() - ENCODING_FLAG_SIZE; + oldData = new byte[oldDataLength]; + buffer.get(oldData); + isLatest = true; + break; + } + case (byte) 3: { + newData = null; + final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE; + oldData = new byte[oldDataLength]; + buffer.get(oldData); + isLatest = readIsLatestFlag(buffer); + break; + } + case (byte) 4: { + oldData = null; + final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE; + newData = new byte[newDataLength]; + buffer.get(newData); + isLatest = readIsLatestFlag(buffer); + break; + } + case (byte) 5: { + final int newDataLength = ByteUtils.readVarint(buffer); + newData = new byte[newDataLength]; + buffer.get(newData); - buffer.get(newData); - buffer.get(oldData); - isLatest = true; - } else if (newOldFlag == (byte) 3) { - newData = null; - final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; - oldData = new byte[oldDataLength]; - buffer.get(oldData); - isLatest = readIsLatestFlag(buffer); - } else if (newOldFlag == (byte) 4) { - oldData = null; - final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; - newData = new byte[newDataLength]; - buffer.get(newData); - isLatest = readIsLatestFlag(buffer); - } else if (newOldFlag == (byte) 5) { - final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer)); - newData = new byte[newDataLength]; + final int oldDataLength = buffer.capacity() - buffer.position() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE; + oldData = new byte[oldDataLength]; + buffer.get(oldData); - final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; - oldData = new byte[oldDataLength]; - - buffer.get(newData); - buffer.get(oldData); - isLatest = readIsLatestFlag(buffer); - } else { - throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer."); + isLatest = readIsLatestFlag(buffer); + break; + } + default: + throw new StreamsException("Encountered unknown byte value `" + encodingFlag + "` for encodingFlag in ChangedDeserializer."); } return new Change<>( @@ -117,7 +129,7 @@ public class ChangedDeserializer implements Deserializer>, Wrapping } private boolean readIsLatestFlag(final ByteBuffer buffer) { - final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE); + final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE); if (isLatestFlag == (byte) 1) { return true; } else if (isLatestFlag == (byte) 0) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 15ba0bfcbd6..8a11575392f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -28,8 +28,8 @@ import java.util.Map; public class ChangedSerializer implements Serializer>, WrappingNullableSerializer, Void, T> { - private static final int NEW_OLD_FLAG_SIZE = 1; - private static final int UINT32_SIZE = 4; + private static final int ENCODING_FLAG_SIZE = 1; + private static final int MAX_VARINT_LENGTH = 5; private Serializer inner; private boolean isUpgrade; @@ -104,33 +104,40 @@ public class ChangedSerializer implements Serializer>, WrappingNull final int oldDataLength = oldValueIsNotNull ? oldData.length : 0; // The serialization format is: - // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} - // {BYTE_ARRAY newValue}{BYTE newOldFlag=1} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} - final ByteBuffer buf; + // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0} + // {BYTE_ARRAY newValue}{BYTE encodingFlag=1} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2} if (newValueIsNotNull && oldValueIsNotNull) { if (isUpgrade) { throw new StreamsException("Both old and new values are not null (" + data.oldValue + " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading."); } else { - final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE; - buf = ByteBuffer.allocate(capacity); - ByteUtils.writeUnsignedInt(buf, newDataLength); + final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + ENCODING_FLAG_SIZE; + final ByteBuffer buf = ByteBuffer.allocate(capacity); + ByteUtils.writeVarint(newDataLength, buf); buf.put(newData).put(oldData).put((byte) 2); + + final byte[] serialized = new byte[buf.position()]; + buf.position(0); + buf.get(serialized); + + return serialized; } } else if (newValueIsNotNull) { - final int capacity = newDataLength + NEW_OLD_FLAG_SIZE; - buf = ByteBuffer.allocate(capacity); + final int capacity = newDataLength + ENCODING_FLAG_SIZE; + final ByteBuffer buf = ByteBuffer.allocate(capacity); buf.put(newData).put((byte) 1); + + return buf.array(); } else if (oldValueIsNotNull) { - final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE; - buf = ByteBuffer.allocate(capacity); + final int capacity = oldDataLength + ENCODING_FLAG_SIZE; + final ByteBuffer buf = ByteBuffer.allocate(capacity); buf.put(oldData).put((byte) 0); + + return buf.array(); } else { throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } - - return buf.array(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java index 475aa53ef0b..e0c66abe5ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java @@ -43,9 +43,9 @@ public class ChangedSerdeTest { private static final ChangedDeserializer CHANGED_STRING_DESERIALIZER = new ChangedDeserializer<>(Serdes.String().deserializer()); - private static final int NEW_OLD_FLAG_SIZE = 1; + private static final int ENCODING_FLAG_SIZE = 1; private static final int IS_LATEST_FLAG_SIZE = 1; - private static final int UINT32_SIZE = 4; + private static final int MAX_VARINT_LENGTH = 5; final String nonNullNewValue = "hello"; final String nonNullOldValue = "world"; @@ -141,29 +141,33 @@ public class ChangedSerdeTest { final int oldDataLength = oldValueIsNotNull ? oldData.length : 0; // The serialization format is: - // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} - // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} + // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3} + // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5} final ByteBuffer buf; final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0; if (newValueIsNotNull && oldValueIsNotNull) { - final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE; buf = ByteBuffer.allocate(capacity); - ByteUtils.writeUnsignedInt(buf, newDataLength); + ByteUtils.writeVarint(newDataLength, buf); buf.put(newData).put(oldData).put(isLatest).put((byte) 5); } else if (newValueIsNotNull) { - final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE; buf = ByteBuffer.allocate(capacity); buf.put(newData).put(isLatest).put((byte) 4); } else if (oldValueIsNotNull) { - final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE; buf = ByteBuffer.allocate(capacity); buf.put(oldData).put(isLatest).put((byte) 3); } else { throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } - return buf.array(); + final byte[] serialized = new byte[buf.position()]; + buf.position(0); + buf.get(serialized); + + return serialized; } private static void checkRoundTripForReservedVersion(final Change data) {