mirror of https://github.com/apache/kafka.git
KAFKA-12446: update change encoding to use varint (#13533)
KIP-904 had the goal in mind to save space when encoding the size on a byte array. However, using UINT32 does not achieve this goal. This PR changes the encoding to VARINT instead. Reviewers: Victoria Xia <victoria.xia@confluent.io>, Farooq Qaiser <fqaiser94@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This commit is contained in:
parent
ab8f285097
commit
2557a4b842
|
@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
|
||||
|
||||
private static final int NEW_OLD_FLAG_SIZE = 1;
|
||||
private static final int ENCODING_FLAG_SIZE = 1;
|
||||
private static final int IS_LATEST_FLAG_SIZE = 1;
|
||||
|
||||
private Deserializer<T> inner;
|
||||
|
@ -50,64 +50,76 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
|
|||
@Override
|
||||
public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
|
||||
// The format we need to deserialize is:
|
||||
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
|
||||
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
|
||||
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
|
||||
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
|
||||
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
|
||||
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
|
||||
// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
|
||||
// {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
|
||||
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
|
||||
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
|
||||
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
|
||||
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
|
||||
final ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||
final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);
|
||||
final byte encodingFlag = buffer.get(data.length - ENCODING_FLAG_SIZE);
|
||||
|
||||
final byte[] newData;
|
||||
final byte[] oldData;
|
||||
final boolean isLatest;
|
||||
if (newOldFlag == (byte) 0) {
|
||||
newData = null;
|
||||
final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
isLatest = true;
|
||||
} else if (newOldFlag == (byte) 1) {
|
||||
oldData = null;
|
||||
final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
isLatest = true;
|
||||
} else if (newOldFlag == (byte) 2) {
|
||||
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
|
||||
newData = new byte[newDataLength];
|
||||
switch (encodingFlag) {
|
||||
case (byte) 0: {
|
||||
newData = null;
|
||||
final int oldDataLength = data.length - ENCODING_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
isLatest = true;
|
||||
break;
|
||||
}
|
||||
case (byte) 1: {
|
||||
oldData = null;
|
||||
final int newDataLength = data.length - ENCODING_FLAG_SIZE;
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
isLatest = true;
|
||||
break;
|
||||
}
|
||||
case (byte) 2: {
|
||||
final int newDataLength = ByteUtils.readVarint(buffer);
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
|
||||
final int oldDataLength = data.length - Integer.BYTES - newDataLength - NEW_OLD_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
final int oldDataLength = buffer.capacity() - buffer.position() - ENCODING_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
isLatest = true;
|
||||
break;
|
||||
}
|
||||
case (byte) 3: {
|
||||
newData = null;
|
||||
final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
break;
|
||||
}
|
||||
case (byte) 4: {
|
||||
oldData = null;
|
||||
final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
break;
|
||||
}
|
||||
case (byte) 5: {
|
||||
final int newDataLength = ByteUtils.readVarint(buffer);
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
|
||||
buffer.get(newData);
|
||||
buffer.get(oldData);
|
||||
isLatest = true;
|
||||
} else if (newOldFlag == (byte) 3) {
|
||||
newData = null;
|
||||
final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
} else if (newOldFlag == (byte) 4) {
|
||||
oldData = null;
|
||||
final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
|
||||
newData = new byte[newDataLength];
|
||||
buffer.get(newData);
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
} else if (newOldFlag == (byte) 5) {
|
||||
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
|
||||
newData = new byte[newDataLength];
|
||||
final int oldDataLength = buffer.capacity() - buffer.position() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
buffer.get(oldData);
|
||||
|
||||
final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
|
||||
oldData = new byte[oldDataLength];
|
||||
|
||||
buffer.get(newData);
|
||||
buffer.get(oldData);
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
} else {
|
||||
throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer.");
|
||||
isLatest = readIsLatestFlag(buffer);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new StreamsException("Encountered unknown byte value `" + encodingFlag + "` for encodingFlag in ChangedDeserializer.");
|
||||
}
|
||||
|
||||
return new Change<>(
|
||||
|
@ -117,7 +129,7 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
|
|||
}
|
||||
|
||||
private boolean readIsLatestFlag(final ByteBuffer buffer) {
|
||||
final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE);
|
||||
final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE);
|
||||
if (isLatestFlag == (byte) 1) {
|
||||
return true;
|
||||
} else if (isLatestFlag == (byte) 0) {
|
||||
|
|
|
@ -28,8 +28,8 @@ import java.util.Map;
|
|||
|
||||
public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {
|
||||
|
||||
private static final int NEW_OLD_FLAG_SIZE = 1;
|
||||
private static final int UINT32_SIZE = 4;
|
||||
private static final int ENCODING_FLAG_SIZE = 1;
|
||||
private static final int MAX_VARINT_LENGTH = 5;
|
||||
private Serializer<T> inner;
|
||||
private boolean isUpgrade;
|
||||
|
||||
|
@ -104,33 +104,40 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
|
|||
final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
|
||||
|
||||
// The serialization format is:
|
||||
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
|
||||
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
|
||||
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
|
||||
final ByteBuffer buf;
|
||||
// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
|
||||
// {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
|
||||
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
|
||||
if (newValueIsNotNull && oldValueIsNotNull) {
|
||||
if (isUpgrade) {
|
||||
throw new StreamsException("Both old and new values are not null (" + data.oldValue
|
||||
+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
|
||||
} else {
|
||||
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
ByteUtils.writeUnsignedInt(buf, newDataLength);
|
||||
final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + ENCODING_FLAG_SIZE;
|
||||
final ByteBuffer buf = ByteBuffer.allocate(capacity);
|
||||
ByteUtils.writeVarint(newDataLength, buf);
|
||||
buf.put(newData).put(oldData).put((byte) 2);
|
||||
|
||||
final byte[] serialized = new byte[buf.position()];
|
||||
buf.position(0);
|
||||
buf.get(serialized);
|
||||
|
||||
return serialized;
|
||||
}
|
||||
} else if (newValueIsNotNull) {
|
||||
final int capacity = newDataLength + NEW_OLD_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
final int capacity = newDataLength + ENCODING_FLAG_SIZE;
|
||||
final ByteBuffer buf = ByteBuffer.allocate(capacity);
|
||||
buf.put(newData).put((byte) 1);
|
||||
|
||||
return buf.array();
|
||||
} else if (oldValueIsNotNull) {
|
||||
final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
final int capacity = oldDataLength + ENCODING_FLAG_SIZE;
|
||||
final ByteBuffer buf = ByteBuffer.allocate(capacity);
|
||||
buf.put(oldData).put((byte) 0);
|
||||
|
||||
return buf.array();
|
||||
} else {
|
||||
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
|
||||
}
|
||||
|
||||
return buf.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,9 +43,9 @@ public class ChangedSerdeTest {
|
|||
private static final ChangedDeserializer<String> CHANGED_STRING_DESERIALIZER =
|
||||
new ChangedDeserializer<>(Serdes.String().deserializer());
|
||||
|
||||
private static final int NEW_OLD_FLAG_SIZE = 1;
|
||||
private static final int ENCODING_FLAG_SIZE = 1;
|
||||
private static final int IS_LATEST_FLAG_SIZE = 1;
|
||||
private static final int UINT32_SIZE = 4;
|
||||
private static final int MAX_VARINT_LENGTH = 5;
|
||||
|
||||
final String nonNullNewValue = "hello";
|
||||
final String nonNullOldValue = "world";
|
||||
|
@ -141,29 +141,33 @@ public class ChangedSerdeTest {
|
|||
final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
|
||||
|
||||
// The serialization format is:
|
||||
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
|
||||
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
|
||||
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
|
||||
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
|
||||
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
|
||||
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
|
||||
final ByteBuffer buf;
|
||||
final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
|
||||
if (newValueIsNotNull && oldValueIsNotNull) {
|
||||
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
|
||||
final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
ByteUtils.writeUnsignedInt(buf, newDataLength);
|
||||
ByteUtils.writeVarint(newDataLength, buf);
|
||||
buf.put(newData).put(oldData).put(isLatest).put((byte) 5);
|
||||
} else if (newValueIsNotNull) {
|
||||
final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
|
||||
final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
buf.put(newData).put(isLatest).put((byte) 4);
|
||||
} else if (oldValueIsNotNull) {
|
||||
final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
|
||||
final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
|
||||
buf = ByteBuffer.allocate(capacity);
|
||||
buf.put(oldData).put(isLatest).put((byte) 3);
|
||||
} else {
|
||||
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
|
||||
}
|
||||
|
||||
return buf.array();
|
||||
final byte[] serialized = new byte[buf.position()];
|
||||
buf.position(0);
|
||||
buf.get(serialized);
|
||||
|
||||
return serialized;
|
||||
}
|
||||
|
||||
private static void checkRoundTripForReservedVersion(final Change<String> data) {
|
||||
|
|
Loading…
Reference in New Issue