mirror of https://github.com/apache/kafka.git
MINOR: Allow a single struct to be a field in the protocol spec (#8413)
Remove the restriction in the protocol generation code that a structure field needs to be part of an array. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
20e4a74c35
commit
a276c54637
|
@ -56,6 +56,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
@ -607,6 +608,22 @@ public final class MessageTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMessage() throws Exception {
|
||||
final SimpleExampleMessageData message = new SimpleExampleMessageData();
|
||||
message.setMyStruct(new SimpleExampleMessageData.MyStruct().setStructId(25).setArrayInStruct(
|
||||
Collections.singletonList(new SimpleExampleMessageData.StructArray().setArrayFieldId(20))
|
||||
));
|
||||
message.setMyTaggedStruct(new SimpleExampleMessageData.MyTaggedStruct().setStructId("abc"));
|
||||
|
||||
message.setProcessId(UUID.randomUUID());
|
||||
message.setMyNullableString("notNull");
|
||||
message.setMyInt16((short) 3);
|
||||
message.setMyString("test string");
|
||||
|
||||
testAllMessageRoundTripsFromVersion((short) 2, message);
|
||||
}
|
||||
|
||||
private void testAllMessageRoundTrips(Message message) throws Exception {
|
||||
testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
|
||||
}
|
||||
|
|
|
@ -281,10 +281,55 @@ public class SimpleExampleMessageTest {
|
|||
message.taggedLong()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMyStruct() {
|
||||
// Verify that we can set and retrieve a nullable struct object.
|
||||
SimpleExampleMessageData.MyStruct myStruct =
|
||||
new SimpleExampleMessageData.MyStruct().setStructId(10).setArrayInStruct(
|
||||
Collections.singletonList(new SimpleExampleMessageData.StructArray().setArrayFieldId(20))
|
||||
);
|
||||
testRoundTrip(new SimpleExampleMessageData().setMyStruct(myStruct),
|
||||
message -> assertEquals(myStruct, message.myStruct()), (short) 2);
|
||||
}
|
||||
|
||||
@Test(expected = UnsupportedVersionException.class)
|
||||
public void testMyStructUnsupportedVersion() {
|
||||
SimpleExampleMessageData.MyStruct myStruct =
|
||||
new SimpleExampleMessageData.MyStruct().setStructId(10);
|
||||
// Check serialization throws exception for unsupported version
|
||||
testRoundTrip(new SimpleExampleMessageData().setMyStruct(myStruct),
|
||||
__ -> { }, (short) 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check following cases:
|
||||
* 1. Tagged struct can be serialized/deserialized for version it is supported
|
||||
* 2. Tagged struct doesn't matter for versions it is not declared.
|
||||
*/
|
||||
@Test
|
||||
public void testMyTaggedStruct() {
|
||||
// Verify that we can set and retrieve a nullable struct object.
|
||||
SimpleExampleMessageData.MyTaggedStruct myStruct =
|
||||
new SimpleExampleMessageData.MyTaggedStruct().setStructId("abc");
|
||||
testRoundTrip(new SimpleExampleMessageData().setMyTaggedStruct(myStruct),
|
||||
message -> assertEquals(myStruct, message.myTaggedStruct()), (short) 2);
|
||||
|
||||
// Not setting field works for both version 1 and version 2 protocol
|
||||
testRoundTrip(new SimpleExampleMessageData().setMyString("abc"),
|
||||
message -> assertEquals("abc", message.myString()), (short) 1);
|
||||
testRoundTrip(new SimpleExampleMessageData().setMyString("abc"),
|
||||
message -> assertEquals("abc", message.myString()), (short) 2);
|
||||
}
|
||||
|
||||
private void testRoundTrip(SimpleExampleMessageData message,
|
||||
Consumer<SimpleExampleMessageData> validator) {
|
||||
testRoundTrip(message, validator, (short) 1);
|
||||
}
|
||||
|
||||
private void testRoundTrip(SimpleExampleMessageData message,
|
||||
Consumer<SimpleExampleMessageData> validator,
|
||||
short version) {
|
||||
validator.accept(message);
|
||||
short version = 1;
|
||||
ObjectSerializationCache cache = new ObjectSerializationCache();
|
||||
int size = message.size(cache, version);
|
||||
ByteBuffer buf = ByteBuffer.allocate(size);
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
{
|
||||
"name": "SimpleExampleMessage",
|
||||
"type": "header",
|
||||
"validVersions": "0-1",
|
||||
"validVersions": "0-2",
|
||||
"flexibleVersions": "1+",
|
||||
"fields": [
|
||||
{ "name": "processId", "versions": "1+", "type": "uuid" },
|
||||
|
@ -36,6 +36,19 @@
|
|||
"taggedVersions": "1+", "tag": 7 },
|
||||
{ "name": "zeroCopyByteBuffer", "versions": "1", "type": "bytes", "zeroCopy": true },
|
||||
{ "name": "nullableZeroCopyByteBuffer", "versions": "1", "nullableVersions": "0+",
|
||||
"type": "bytes", "zeroCopy": true }
|
||||
"type": "bytes", "zeroCopy": true },
|
||||
{ "name": "myStruct", "type": "MyStruct", "versions": "2+", "about": "Test Struct field",
|
||||
"fields": [
|
||||
{ "name": "structId", "type": "int32", "versions": "2+", "about": "Int field in struct"},
|
||||
{ "name": "arrayInStruct", "type": "[]StructArray", "versions": "2+",
|
||||
"fields": [
|
||||
{ "name": "arrayFieldId", "type": "int32", "versions": "2+"}
|
||||
]}
|
||||
]},
|
||||
{ "name": "myTaggedStruct", "type": "MyTaggedStruct", "versions": "2+", "about": "Test Tagged Struct field",
|
||||
"taggedVersions": "2+", "tag": 8,
|
||||
"fields": [
|
||||
{ "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
|
||||
]}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -102,10 +102,11 @@ public final class FieldSpec {
|
|||
|
||||
this.about = about == null ? "" : about;
|
||||
if (!this.fields().isEmpty()) {
|
||||
if (!this.type.isArray()) {
|
||||
throw new RuntimeException("Non-array field " + name + " cannot have fields");
|
||||
if (!this.type.isArray() && !this.type.isStruct()) {
|
||||
throw new RuntimeException("Non-array or Struct field " + name + " cannot have fields");
|
||||
}
|
||||
}
|
||||
|
||||
if (flexibleVersions == null || flexibleVersions.isEmpty()) {
|
||||
this.flexibleVersions = Optional.empty();
|
||||
} else {
|
||||
|
|
|
@ -168,6 +168,13 @@ public final class MessageDataGenerator {
|
|||
structRegistry.findStruct(field),
|
||||
parentVersions.intersect(struct.versions()));
|
||||
}
|
||||
} else if (field.type().isStruct()) {
|
||||
if (!structRegistry.commonStructNames().contains(field.name())) {
|
||||
generateClass(Optional.empty(),
|
||||
field.type().toString(),
|
||||
structRegistry.findStruct(field),
|
||||
parentVersions.intersect(struct.versions()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isSetElement) {
|
||||
|
@ -477,7 +484,7 @@ public final class MessageDataGenerator {
|
|||
buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field));
|
||||
}).
|
||||
ifMember(presentAndUntaggedVersions -> {
|
||||
if (field.type().isVariableLength()) {
|
||||
if (field.type().isVariableLength() && !field.type().isStruct()) {
|
||||
ClauseGenerator callGenerateVariableLengthReader = versions -> {
|
||||
generateVariableLengthReader(fieldFlexibleVersions(field),
|
||||
field.camelCaseName(),
|
||||
|
@ -531,7 +538,7 @@ public final class MessageDataGenerator {
|
|||
buffer.incrementIndent();
|
||||
VersionConditional.forVersions(validTaggedVersions, curFlexibleVersions).
|
||||
ifMember(presentAndTaggedVersions -> {
|
||||
if (field.type().isVariableLength()) {
|
||||
if (field.type().isVariableLength() && !field.type().isStruct()) {
|
||||
// All tagged fields are serialized using the new-style
|
||||
// flexible versions serialization.
|
||||
generateVariableLengthReader(fieldFlexibleVersions(field),
|
||||
|
@ -1180,7 +1187,8 @@ public final class MessageDataGenerator {
|
|||
return String.format("struct.getByteArray(\"%s\")", name);
|
||||
}
|
||||
} else if (type.isStruct()) {
|
||||
return String.format("new %s(struct, _version)", type.toString());
|
||||
return String.format("new %s((Struct) struct.get(\"%s\"), _version)",
|
||||
type.toString(), name);
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported field type " + type);
|
||||
}
|
||||
|
@ -1220,7 +1228,7 @@ public final class MessageDataGenerator {
|
|||
ifMember(presentVersions -> {
|
||||
VersionConditional.forVersions(field.taggedVersions(), presentVersions).
|
||||
ifNotMember(presentAndUntaggedVersions -> {
|
||||
if (field.type().isVariableLength()) {
|
||||
if (field.type().isVariableLength() && !field.type().isStruct()) {
|
||||
ClauseGenerator callGenerateVariableLengthWriter = versions -> {
|
||||
generateVariableLengthWriter(fieldFlexibleVersions(field),
|
||||
field.camelCaseName(),
|
||||
|
@ -1324,6 +1332,11 @@ public final class MessageDataGenerator {
|
|||
presentAndTaggedVersions,
|
||||
Versions.NONE,
|
||||
field.zeroCopy());
|
||||
} else if (field.type().isStruct()) {
|
||||
buffer.printf("_writable.writeUnsignedVarint(this.%s.size(_cache, _version));%n",
|
||||
field.camelCaseName());
|
||||
buffer.printf("%s;%n",
|
||||
primitiveWriteExpression(field.type(), field.camelCaseName()));
|
||||
} else {
|
||||
buffer.printf("_writable.writeUnsignedVarint(%d);%n",
|
||||
field.type().fixedLength().get());
|
||||
|
@ -1522,7 +1535,7 @@ public final class MessageDataGenerator {
|
|||
field.camelCaseName(), field.camelCaseName());
|
||||
}
|
||||
}
|
||||
} else if (field.type().isString()) {
|
||||
} else if (field.type().isString() || field.type().isStruct()) {
|
||||
if (fieldDefault(field).equals("null")) {
|
||||
buffer.printf("if (%s != null) {%n", field.camelCaseName());
|
||||
} else if (nullableVersions.empty()) {
|
||||
|
@ -1633,6 +1646,9 @@ public final class MessageDataGenerator {
|
|||
buffer.printf("struct.set(\"%s\", (Object[]) _nestedObjects);%n",
|
||||
field.snakeCaseName());
|
||||
}).generate(buffer);
|
||||
} else if (field.type().isStruct()) {
|
||||
buffer.printf("struct.set(\"%s\", this.%s.toStruct(_version));%n",
|
||||
field.snakeCaseName(), field.camelCaseName());
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported field type " + field.type());
|
||||
}
|
||||
|
@ -1651,6 +1667,7 @@ public final class MessageDataGenerator {
|
|||
(field.type() instanceof FieldType.Int64FieldType) ||
|
||||
(field.type() instanceof FieldType.UUIDFieldType) ||
|
||||
(field.type() instanceof FieldType.Float64FieldType) ||
|
||||
(field.type() instanceof FieldType.StructType) ||
|
||||
(field.type() instanceof FieldType.StringFieldType)) {
|
||||
buffer.printf("_taggedFields.put(%d, %s);%n",
|
||||
field.tag().get(), field.camelCaseName());
|
||||
|
@ -1957,6 +1974,12 @@ public final class MessageDataGenerator {
|
|||
} else {
|
||||
buffer.printf("_size += _bytesSize;%n");
|
||||
}
|
||||
} else if (field.type().isStruct()) {
|
||||
buffer.printf("int size = this.%s.size(_cache, _version);%n", field.camelCaseName());
|
||||
if (tagged) {
|
||||
buffer.printf("_size += ByteUtils.sizeOfUnsignedVarint(size);%n");
|
||||
}
|
||||
buffer.printf("_size += size;%n");
|
||||
} else {
|
||||
throw new RuntimeException("unhandled type " + field.type());
|
||||
}
|
||||
|
|
|
@ -59,21 +59,28 @@ final class StructRegistry {
|
|||
@SuppressWarnings("unchecked")
|
||||
private void addStructSpecs(List<FieldSpec> fields) {
|
||||
for (FieldSpec field : fields) {
|
||||
String elementName = null;
|
||||
if (field.type().isStructArray()) {
|
||||
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
|
||||
if (commonStructNames.contains(arrayType.elementName())) {
|
||||
elementName = arrayType.elementName();
|
||||
} else if (field.type().isStruct()) {
|
||||
elementName = field.name();
|
||||
}
|
||||
|
||||
if (elementName != null) {
|
||||
if (commonStructNames.contains(elementName)) {
|
||||
// If we're using a common structure, we can't specify its fields.
|
||||
// The fields should be specified in the commonStructs area.
|
||||
if (!field.fields().isEmpty()) {
|
||||
throw new RuntimeException("Can't re-specify the common struct " +
|
||||
arrayType.elementName() + " as an inline struct.");
|
||||
elementName + " as an inline struct.");
|
||||
}
|
||||
} else if (structSpecs.put(arrayType.elementName(),
|
||||
new StructSpec(arrayType.elementName(),
|
||||
} else if (structSpecs.put(elementName,
|
||||
new StructSpec(elementName,
|
||||
field.versions().toString(),
|
||||
field.fields())) != null) {
|
||||
// Inline structures should only appear once.
|
||||
throw new RuntimeException("Struct " + arrayType.elementName() +
|
||||
throw new RuntimeException("Struct " + elementName +
|
||||
" was specified twice.");
|
||||
}
|
||||
addStructSpecs(field.fields());
|
||||
|
@ -86,15 +93,21 @@ final class StructRegistry {
|
|||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
StructSpec findStruct(FieldSpec field) {
|
||||
if ((!field.type().isArray()) && (field.type().isStruct())) {
|
||||
String structFieldName;
|
||||
if (field.type().isArray()) {
|
||||
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
|
||||
structFieldName = arrayType.elementName();
|
||||
} else if (field.type().isStruct()) {
|
||||
structFieldName = field.name();
|
||||
} else {
|
||||
throw new RuntimeException("Field " + field.name() +
|
||||
" cannot be treated as a structure.");
|
||||
}
|
||||
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
|
||||
StructSpec struct = structSpecs.get(arrayType.elementName());
|
||||
StructSpec struct = structSpecs.get(structFieldName);
|
||||
|
||||
if (struct == null) {
|
||||
throw new RuntimeException("Unable to locate a specification for the structure " +
|
||||
arrayType.elementName());
|
||||
structFieldName);
|
||||
}
|
||||
return struct;
|
||||
}
|
||||
|
|
|
@ -121,4 +121,30 @@ public class StructRegistryTest {
|
|||
assertTrue(e.getMessage().contains("Common struct TestCommonStruct was specified twice."));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleStruct() throws Exception {
|
||||
MessageSpec testMessageSpec = MessageGenerator.JSON_SERDE.readValue(String.join("", Arrays.asList(
|
||||
"{",
|
||||
" \"type\": \"request\",",
|
||||
" \"name\": \"LeaderAndIsrRequest\",",
|
||||
" \"validVersions\": \"0-2\",",
|
||||
" \"fields\": [",
|
||||
" { \"name\": \"field1\", \"type\": \"int32\", \"versions\": \"0+\" },",
|
||||
" { \"name\": \"field2\", \"type\": \"TestInlineStruct\", \"versions\": \"0+\", ",
|
||||
" \"fields\": [",
|
||||
" { \"name\": \"inlineField1\", \"type\": \"int64\", \"versions\": \"0+\" }",
|
||||
" ]}",
|
||||
" ]",
|
||||
"}")), MessageSpec.class);
|
||||
StructRegistry structRegistry = new StructRegistry();
|
||||
structRegistry.register(testMessageSpec);
|
||||
|
||||
FieldSpec field2 = testMessageSpec.fields().get(1);
|
||||
assertTrue(field2.type().isStruct());
|
||||
assertEquals("TestInlineStruct", field2.type().toString());
|
||||
|
||||
assertEquals("field2", structRegistry.findStruct(field2).name());
|
||||
assertFalse(structRegistry.isStructArrayWithKeys(field2));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue