KAFKA-12902: Add unit32 type in generator (#10830)

Add uint32 support in the KRPC generator.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
dengziming 2022-05-26 07:25:16 +08:00 committed by GitHub
parent 76477ffd2d
commit c22d320a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 164 additions and 12 deletions

View File

@ -25,7 +25,7 @@
<!-- Generator -->
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldType).java"/>
<suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS"

View File

@ -29,6 +29,11 @@ import java.util.List;
public final class MessageUtil {
public static final long UNSIGNED_INT_MAX = 4294967295L;
public static final int UNSIGNED_SHORT_MAX = 65535;
/**
* Copy a byte buffer into an array. This will not affect the buffer's
* position or mark.
@ -87,13 +92,22 @@ public final class MessageUtil {
public static int jsonNodeToUnsignedShort(JsonNode node, String about) {
int value = jsonNodeToInt(node, about);
if (value < 0 || value > 65535) {
if (value < 0 || value > UNSIGNED_SHORT_MAX) {
throw new RuntimeException(about + ": value " + value +
" does not fit in a 16-bit unsigned integer.");
}
return value;
}
public static long jsonNodeToUnsignedInt(JsonNode node, String about) {
long value = jsonNodeToLong(node, about);
if (value < 0 || value > UNSIGNED_INT_MAX) {
throw new RuntimeException(about + ": value " + value +
" does not fit in a 32-bit unsigned integer.");
}
return value;
}
public static int jsonNodeToInt(JsonNode node, String about) {
if (node.isInt()) {
return node.asInt();

View File

@ -75,4 +75,8 @@ public interface Readable {
default int readUnsignedShort() {
return Short.toUnsignedInt(readShort());
}
default long readUnsignedInt() {
return Integer.toUnsignedLong(readInt());
}
}

View File

@ -54,4 +54,8 @@ public interface Writable {
// ints outside the valid range of a short.
writeShort((short) i);
}
default void writeUnsignedInt(long i) {
writeInt((int) i);
}
}

View File

@ -97,6 +97,12 @@ public class Field {
}
}
public static class Uint32 extends Field {
public Uint32(String name, String docString) {
super(name, Type.UNSIGNED_INT32, docString, false, null);
}
}
public static class Float64 extends Field {
public Float64(String name, String docString) {
super(name, Type.FLOAT64, docString, false, null);

View File

@ -23,6 +23,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
/**
* A record that can be serialized and deserialized according to a pre-defined schema
*/
@ -97,6 +100,10 @@ public class Struct {
return getInt(field.name);
}
public Long get(Field.Uint32 field) {
return getLong(field.name);
}
public Short get(Field.Int16 field) {
return getShort(field.name);
}
@ -270,6 +277,10 @@ public class Struct {
return (Long) get(name);
}
public Long getUnsignedInt(BoundField field) {
return (Long) get(field);
}
public Long getLong(BoundField field) {
return (Long) get(field);
}
@ -400,13 +411,21 @@ public class Struct {
}
public Struct set(Field.Uint16 def, int value) {
if (value < 0 || value > 65535) {
if (value < 0 || value > UNSIGNED_SHORT_MAX) {
throw new RuntimeException("Invalid value for unsigned short for " +
def.name + ": " + value);
}
return set(def.name, value);
}
public Struct set(Field.Uint32 def, long value) {
if (value < 0 || value > UNSIGNED_INT_MAX) {
throw new RuntimeException("Invalid value for unsigned int for " +
def.name + ": " + value);
}
return set(def.name, value);
}
public Struct set(Field.Float64 def, double value) {
return set(def.name, value);
}

View File

@ -30,6 +30,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.function.Consumer;
import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -179,6 +181,18 @@ public class SimpleExampleMessageTest {
message -> assertEquals((short) 456, message.myInt16()));
}
@Test
public void testMyUint32() {
// Verify that the uint16 field reads as 33000 when not set.
testRoundTrip(new SimpleExampleMessageData(),
message -> assertEquals(1234567, message.myUint32()));
testRoundTrip(new SimpleExampleMessageData().setMyUint32(123),
message -> assertEquals(123, message.myUint32()));
testRoundTrip(new SimpleExampleMessageData().setMyUint32(60000),
message -> assertEquals(60000, message.myUint32()));
}
@Test
public void testMyUint16() {
// Verify that the uint16 field reads as 33000 when not set.
@ -206,7 +220,12 @@ public class SimpleExampleMessageTest {
assertThrows(RuntimeException.class,
() -> new SimpleExampleMessageData().setMyUint16(-1));
assertThrows(RuntimeException.class,
() -> new SimpleExampleMessageData().setMyUint16(65536));
() -> new SimpleExampleMessageData().setMyUint16(UNSIGNED_SHORT_MAX + 1));
assertThrows(RuntimeException.class,
() -> new SimpleExampleMessageData().setMyUint32(-1));
assertThrows(RuntimeException.class,
() -> new SimpleExampleMessageData().setMyUint32(UNSIGNED_INT_MAX + 1));
// Verify that the tagged field reads as empty when not set.
testRoundTrip(new SimpleExampleMessageData(),
@ -355,6 +374,7 @@ public class SimpleExampleMessageTest {
"myTaggedStruct=TaggedStruct(structId=''), " +
"myCommonStruct=TestCommonStruct(foo=123, bar=123), " +
"myOtherCommonStruct=TestCommonStruct(foo=123, bar=123), " +
"myUint16=65535)", message.toString());
"myUint16=65535, " +
"myUint32=1234567)", message.toString());
}
}

View File

@ -77,4 +77,10 @@ public final class MessageUtilTest {
Arrays.asList(new RawTaggedField(1, new byte[] {1}),
new RawTaggedField(2, new byte[] {}))));
}
@Test
public void testConstants() {
assertEquals(MessageUtil.UNSIGNED_SHORT_MAX, 0xFFFF);
assertEquals(MessageUtil.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
}
}

View File

@ -50,9 +50,10 @@
"fields": [
{ "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
]},
{ "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" }
{ "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" },
{ "name": "myUint32", "type": "uint32", "versions": "1+", "default": "1234567" }
],
"commonStructs": [
{ "name": "TestCommonStruct", "versions": "0+", "fields": [

View File

@ -300,6 +300,7 @@ public final class FieldSpec {
} else if ((type instanceof FieldType.Int8FieldType) ||
(type instanceof FieldType.Int16FieldType) ||
(type instanceof FieldType.Uint16FieldType) ||
(type instanceof FieldType.Uint32FieldType) ||
(type instanceof FieldType.Int32FieldType) ||
(type instanceof FieldType.Int64FieldType)) {
int base = 10;
@ -338,7 +339,7 @@ public final class FieldSpec {
} else {
try {
int value = Integer.valueOf(defaultString, base);
if (value < 0 || value > 65535) {
if (value < 0 || value > MessageGenerator.UNSIGNED_SHORT_MAX) {
throw new RuntimeException("Invalid default for uint16 field " +
name + ": out of range.");
}
@ -348,6 +349,22 @@ public final class FieldSpec {
}
return fieldDefault;
}
} else if (type instanceof FieldType.Uint32FieldType) {
if (defaultString.isEmpty()) {
return "0";
} else {
try {
long value = Long.valueOf(defaultString, base);
if (value < 0 || value > MessageGenerator.UNSIGNED_INT_MAX) {
throw new RuntimeException("Invalid default for uint32 field " +
name + ": out of range.");
}
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid default for uint32 field " +
name + ": " + defaultString, e);
}
return fieldDefault;
}
} else if (type instanceof FieldType.Int32FieldType) {
if (defaultString.isEmpty()) {
return "0";
@ -476,6 +493,8 @@ public final class FieldSpec {
return "short";
} else if (type instanceof FieldType.Uint16FieldType) {
return "int";
} else if (type instanceof FieldType.Uint32FieldType) {
return "long";
} else if (type instanceof FieldType.Int32FieldType) {
return "int";
} else if (type instanceof FieldType.Int64FieldType) {

View File

@ -122,6 +122,26 @@ public interface FieldType {
}
}
final class Uint32FieldType implements FieldType {
static final Uint32FieldType INSTANCE = new Uint32FieldType();
private static final String NAME = "uint32";
@Override
public String getBoxedJavaType(HeaderGenerator headerGenerator) {
return "Long";
}
@Override
public Optional<Integer> fixedLength() {
return Optional.of(4);
}
@Override
public String toString() {
return NAME;
}
}
final class Int64FieldType implements FieldType {
static final Int64FieldType INSTANCE = new Int64FieldType();
private static final String NAME = "int64";
@ -369,6 +389,8 @@ public interface FieldType {
return Int16FieldType.INSTANCE;
case Uint16FieldType.NAME:
return Uint16FieldType.INSTANCE;
case Uint32FieldType.NAME:
return Uint32FieldType.INSTANCE;
case Int32FieldType.NAME:
return Int32FieldType.INSTANCE;
case Int64FieldType.NAME:

View File

@ -164,6 +164,11 @@ public final class JsonConverterGenerator implements MessageClassGenerator {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToUnsignedShort(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Uint32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToUnsignedInt(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
@ -346,7 +351,8 @@ public final class JsonConverterGenerator implements MessageClassGenerator {
headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new IntNode(%s)", target.sourceVariable())));
} else if (target.field().type() instanceof FieldType.Int64FieldType) {
} else if (target.field().type() instanceof FieldType.Int64FieldType ||
(target.field().type() instanceof FieldType.Uint32FieldType)) {
headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new LongNode(%s)", target.sourceVariable())));

View File

@ -541,6 +541,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
return "_readable.readShort()";
} else if (type instanceof FieldType.Uint16FieldType) {
return "_readable.readUnsignedShort()";
} else if (type instanceof FieldType.Uint32FieldType) {
return "_readable.readUnsignedInt()";
} else if (type instanceof FieldType.Int32FieldType) {
return "_readable.readInt()";
} else if (type instanceof FieldType.Int64FieldType) {
@ -848,6 +850,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
return String.format("_writable.writeShort(%s)", name);
} else if (type instanceof FieldType.Uint16FieldType) {
return String.format("_writable.writeUnsignedShort(%s)", name);
} else if (type instanceof FieldType.Uint32FieldType) {
return String.format("_writable.writeUnsignedInt(%s)", name);
} else if (type instanceof FieldType.Int32FieldType) {
return String.format("_writable.writeInt(%s)", name);
} else if (type instanceof FieldType.Int64FieldType) {
@ -1372,7 +1376,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
(field.type() instanceof FieldType.Int32FieldType)) {
buffer.printf("hashCode = 31 * hashCode + %s;%n",
field.camelCaseName());
} else if (field.type() instanceof FieldType.Int64FieldType) {
} else if (field.type() instanceof FieldType.Int64FieldType ||
(field.type() instanceof FieldType.Uint32FieldType)) {
buffer.printf("hashCode = 31 * hashCode + ((int) (%s >> 32) ^ (int) %s);%n",
field.camelCaseName(), field.camelCaseName());
} else if (field.type() instanceof FieldType.UUIDFieldType) {
@ -1427,6 +1432,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
(field.type() instanceof FieldType.Int8FieldType) ||
(field.type() instanceof FieldType.Int16FieldType) ||
(field.type() instanceof FieldType.Uint16FieldType) ||
(field.type() instanceof FieldType.Uint32FieldType) ||
(field.type() instanceof FieldType.Int32FieldType) ||
(field.type() instanceof FieldType.Int64FieldType) ||
(field.type() instanceof FieldType.Float64FieldType) ||
@ -1514,6 +1520,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
} else if ((field.type() instanceof FieldType.Int8FieldType) ||
(field.type() instanceof FieldType.Int16FieldType) ||
(field.type() instanceof FieldType.Uint16FieldType) ||
(field.type() instanceof FieldType.Uint32FieldType) ||
(field.type() instanceof FieldType.Int32FieldType) ||
(field.type() instanceof FieldType.Int64FieldType) ||
(field.type() instanceof FieldType.Float64FieldType)) {
@ -1576,13 +1583,21 @@ public final class MessageDataGenerator implements MessageClassGenerator {
field.fieldAbstractJavaType(headerGenerator, structRegistry));
buffer.incrementIndent();
if (field.type() instanceof FieldType.Uint16FieldType) {
buffer.printf("if (v < 0 || v > 65535) {%n");
buffer.printf("if (v < 0 || v > %d) {%n", MessageGenerator.UNSIGNED_SHORT_MAX);
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"Invalid value \" + v + " +
"\" for unsigned short field.\");%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
if (field.type() instanceof FieldType.Uint32FieldType) {
buffer.printf("if (v < 0 || v > %dL) {%n", MessageGenerator.UNSIGNED_INT_MAX);
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"Invalid value \" + v + " +
"\" for unsigned int field.\");%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
buffer.printf("this.%s = v;%n", field.camelCaseName());
buffer.printf("return this;%n");
buffer.decrementIndent();

View File

@ -156,6 +156,10 @@ public final class MessageGenerator {
static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";
static final long UNSIGNED_INT_MAX = 4294967295L;
static final int UNSIGNED_SHORT_MAX = 65535;
/**
* The Jackson serializer we use for JSON objects.
*/

View File

@ -250,6 +250,12 @@ final class SchemaGenerator {
throw new RuntimeException("Type " + type + " cannot be nullable.");
}
return "Type.UINT16";
} else if (type instanceof FieldType.Uint32FieldType) {
headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
if (nullable) {
throw new RuntimeException("Type " + type + " cannot be nullable.");
}
return "Type.UNSIGNED_INT32";
} else if (type instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
if (nullable) {

View File

@ -67,4 +67,10 @@ public class MessageGeneratorTest {
} catch (RuntimeException e) {
}
}
@Test
public void testConstants() {
assertEquals(MessageGenerator.UNSIGNED_SHORT_MAX, 0xFFFF);
assertEquals(MessageGenerator.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
}
}