mirror of https://github.com/apache/kafka.git
KAFKA-18199; Fix size calculation for nullable tagged structs (#18127)
When a struct field is tagged and nullable, it is serialized as { varint tag; varint dataLength; nullable data }, where nullable is serialized as { varint isNotNull; if (isNotNull) struct s; }. The length field includes the is-not-null varint. This patch fixes a bug in serialization where the written value of the length field and the value used to compute the size of the length field differs by 1. In practice this has no impact unless the serialized length of the struct is 127 bytes, since the varint encodings of 127 and 128 have different lengths (0x7f vs 0x80 01). Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
770d64d2cc
commit
b94defa189
|
@ -18,6 +18,7 @@ package org.apache.kafka.common.message;
|
||||||
|
|
||||||
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
||||||
import org.apache.kafka.common.protocol.MessageUtil;
|
import org.apache.kafka.common.protocol.MessageUtil;
|
||||||
|
import org.apache.kafka.common.protocol.ObjectSerializationCache;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
@ -98,6 +99,29 @@ public class NullableStructMessageTest {
|
||||||
message.toString();
|
message.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for KAFKA-18199. Tests that the size of the varint encoding a tagged nullable
|
||||||
|
* struct's size is calculated correctly.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTaggedStructSize() {
|
||||||
|
NullableStructMessageData message = new NullableStructMessageData()
|
||||||
|
.setNullableStruct(null)
|
||||||
|
.setNullableStruct2(null)
|
||||||
|
.setNullableStruct3(null)
|
||||||
|
.setNullableStruct4(new NullableStructMessageData.MyStruct4()
|
||||||
|
.setMyInt(4)
|
||||||
|
.setMyString(new String(new char[121])));
|
||||||
|
|
||||||
|
// We want the struct to be 127 bytes long, so that the varint encoding of its size is one
|
||||||
|
// short of overflowing into a two-byte representation. An extra byte is added to the
|
||||||
|
// nullable struct size to account for the is-not-null flag.
|
||||||
|
assertEquals(127, message.nullableStruct4().size(new ObjectSerializationCache(), (short) 2));
|
||||||
|
|
||||||
|
NullableStructMessageData newMessage = roundTrip(message, (short) 2);
|
||||||
|
assertEquals(message, newMessage);
|
||||||
|
}
|
||||||
|
|
||||||
private NullableStructMessageData deserialize(ByteBuffer buf, short version) {
|
private NullableStructMessageData deserialize(ByteBuffer buf, short version) {
|
||||||
NullableStructMessageData message = new NullableStructMessageData();
|
NullableStructMessageData message = new NullableStructMessageData();
|
||||||
message.read(new ByteBufferAccessor(buf.duplicate()), version);
|
message.read(new ByteBufferAccessor(buf.duplicate()), version);
|
||||||
|
@ -110,6 +134,8 @@ public class NullableStructMessageTest {
|
||||||
|
|
||||||
private NullableStructMessageData roundTrip(NullableStructMessageData message, short version) {
|
private NullableStructMessageData roundTrip(NullableStructMessageData message, short version) {
|
||||||
ByteBuffer buffer = serialize(message, version);
|
ByteBuffer buffer = serialize(message, version);
|
||||||
|
// Check size calculation
|
||||||
|
assertEquals(buffer.remaining(), message.size(new ObjectSerializationCache(), version));
|
||||||
return deserialize(buffer.duplicate(), version);
|
return deserialize(buffer.duplicate(), version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,6 +360,8 @@ public class SimpleExampleMessageTest {
|
||||||
short version
|
short version
|
||||||
) {
|
) {
|
||||||
ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
|
ByteBuffer buf = MessageUtil.toByteBuffer(message, version);
|
||||||
|
// Check size calculation
|
||||||
|
assertEquals(buf.remaining(), message.size(new ObjectSerializationCache(), version));
|
||||||
return deserialize(buf.duplicate(), version);
|
return deserialize(buf.duplicate(), version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1332,19 +1332,24 @@ public final class MessageDataGenerator implements MessageClassGenerator {
|
||||||
}).
|
}).
|
||||||
generate(buffer);
|
generate(buffer);
|
||||||
} else if (field.type().isStruct()) {
|
} else if (field.type().isStruct()) {
|
||||||
// Adding a byte if the field is nullable. A byte works for both regular and tagged struct fields.
|
|
||||||
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
|
|
||||||
ifMember(__ -> {
|
|
||||||
buffer.printf("_size.addBytes(1);%n");
|
|
||||||
}).
|
|
||||||
generate(buffer);
|
|
||||||
|
|
||||||
if (tagged) {
|
if (tagged) {
|
||||||
buffer.printf("int _sizeBeforeStruct = _size.totalSize();%n", field.camelCaseName());
|
buffer.printf("int _sizeBeforeStruct = _size.totalSize();%n", field.camelCaseName());
|
||||||
|
// Add a byte if the field is nullable.
|
||||||
|
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
|
||||||
|
ifMember(__ -> {
|
||||||
|
buffer.printf("_size.addBytes(1);%n");
|
||||||
|
}).
|
||||||
|
generate(buffer);
|
||||||
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
|
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
|
||||||
buffer.printf("int _structSize = _size.totalSize() - _sizeBeforeStruct;%n", field.camelCaseName());
|
buffer.printf("int _structSize = _size.totalSize() - _sizeBeforeStruct;%n", field.camelCaseName());
|
||||||
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_structSize));%n");
|
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_structSize));%n");
|
||||||
} else {
|
} else {
|
||||||
|
// Add a byte if the field is nullable.
|
||||||
|
VersionConditional.forVersions(field.nullableVersions(), possibleVersions).
|
||||||
|
ifMember(__ -> {
|
||||||
|
buffer.printf("_size.addBytes(1);%n");
|
||||||
|
}).
|
||||||
|
generate(buffer);
|
||||||
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
|
buffer.printf("this.%s.addSize(_size, _cache, _version);%n", field.camelCaseName());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue