diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c5cd99fdaa5..10af4e66c28 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -25,7 +25,7 @@
+ files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldType).java"/>
65535) {
+ if (value < 0 || value > UNSIGNED_SHORT_MAX) {
throw new RuntimeException(about + ": value " + value +
" does not fit in a 16-bit unsigned integer.");
}
return value;
}
+ public static long jsonNodeToUnsignedInt(JsonNode node, String about) {
+ long value = jsonNodeToLong(node, about);
+ if (value < 0 || value > UNSIGNED_INT_MAX) {
+ throw new RuntimeException(about + ": value " + value +
+ " does not fit in a 32-bit unsigned integer.");
+ }
+ return value;
+ }
+
public static int jsonNodeToInt(JsonNode node, String about) {
if (node.isInt()) {
return node.asInt();
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 9c9e461ca80..561696827df 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -75,4 +75,8 @@ public interface Readable {
default int readUnsignedShort() {
return Short.toUnsignedInt(readShort());
}
+
+ default long readUnsignedInt() {
+ return Integer.toUnsignedLong(readInt());
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
index 8dbec871342..0677340af4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
@@ -54,4 +54,8 @@ public interface Writable {
// ints outside the valid range of a short.
writeShort((short) i);
}
+
+ default void writeUnsignedInt(long i) {
+ writeInt((int) i);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 44726f8240c..f030387b6fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -97,6 +97,12 @@ public class Field {
}
}
+ public static class Uint32 extends Field {
+ public Uint32(String name, String docString) {
+ super(name, Type.UNSIGNED_INT32, docString, false, null);
+ }
+ }
+
public static class Float64 extends Field {
public Float64(String name, String docString) {
super(name, Type.FLOAT64, docString, false, null);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 9b9b5e66b66..e39a84137bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -23,6 +23,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
+
/**
* A record that can be serialized and deserialized according to a pre-defined schema
*/
@@ -97,6 +100,10 @@ public class Struct {
return getInt(field.name);
}
+ public Long get(Field.Uint32 field) {
+ return getLong(field.name);
+ }
+
public Short get(Field.Int16 field) {
return getShort(field.name);
}
@@ -270,6 +277,10 @@ public class Struct {
return (Long) get(name);
}
+ public Long getUnsignedInt(BoundField field) {
+ return (Long) get(field);
+ }
+
public Long getLong(BoundField field) {
return (Long) get(field);
}
@@ -400,13 +411,21 @@ public class Struct {
}
public Struct set(Field.Uint16 def, int value) {
- if (value < 0 || value > 65535) {
+ if (value < 0 || value > UNSIGNED_SHORT_MAX) {
throw new RuntimeException("Invalid value for unsigned short for " +
def.name + ": " + value);
}
return set(def.name, value);
}
+ public Struct set(Field.Uint32 def, long value) {
+ if (value < 0 || value > UNSIGNED_INT_MAX) {
+ throw new RuntimeException("Invalid value for unsigned int for " +
+ def.name + ": " + value);
+ }
+ return set(def.name, value);
+ }
+
public Struct set(Field.Float64 def, double value) {
return set(def.name, value);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 1cdafcd0fdc..b904eed2721 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -30,6 +30,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.function.Consumer;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_INT_MAX;
+import static org.apache.kafka.common.protocol.MessageUtil.UNSIGNED_SHORT_MAX;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -179,6 +181,18 @@ public class SimpleExampleMessageTest {
message -> assertEquals((short) 456, message.myInt16()));
}
+ @Test
+ public void testMyUint32() {
+ // Verify that the uint16 field reads as 33000 when not set.
+ testRoundTrip(new SimpleExampleMessageData(),
+ message -> assertEquals(1234567, message.myUint32()));
+
+ testRoundTrip(new SimpleExampleMessageData().setMyUint32(123),
+ message -> assertEquals(123, message.myUint32()));
+ testRoundTrip(new SimpleExampleMessageData().setMyUint32(60000),
+ message -> assertEquals(60000, message.myUint32()));
+ }
+
@Test
public void testMyUint16() {
// Verify that the uint16 field reads as 33000 when not set.
@@ -206,7 +220,12 @@ public class SimpleExampleMessageTest {
assertThrows(RuntimeException.class,
() -> new SimpleExampleMessageData().setMyUint16(-1));
assertThrows(RuntimeException.class,
- () -> new SimpleExampleMessageData().setMyUint16(65536));
+ () -> new SimpleExampleMessageData().setMyUint16(UNSIGNED_SHORT_MAX + 1));
+
+ assertThrows(RuntimeException.class,
+ () -> new SimpleExampleMessageData().setMyUint32(-1));
+ assertThrows(RuntimeException.class,
+ () -> new SimpleExampleMessageData().setMyUint32(UNSIGNED_INT_MAX + 1));
// Verify that the tagged field reads as empty when not set.
testRoundTrip(new SimpleExampleMessageData(),
@@ -355,6 +374,7 @@ public class SimpleExampleMessageTest {
"myTaggedStruct=TaggedStruct(structId=''), " +
"myCommonStruct=TestCommonStruct(foo=123, bar=123), " +
"myOtherCommonStruct=TestCommonStruct(foo=123, bar=123), " +
- "myUint16=65535)", message.toString());
+ "myUint16=65535, " +
+ "myUint32=1234567)", message.toString());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
index 33dcabb80a4..5195f551157 100755
--- a/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/MessageUtilTest.java
@@ -77,4 +77,10 @@ public final class MessageUtilTest {
Arrays.asList(new RawTaggedField(1, new byte[] {1}),
new RawTaggedField(2, new byte[] {}))));
}
+
+ @Test
+ public void testConstants() {
+ assertEquals(MessageUtil.UNSIGNED_SHORT_MAX, 0xFFFF);
+ assertEquals(MessageUtil.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
+ }
}
diff --git a/clients/src/test/resources/common/message/SimpleExampleMessage.json b/clients/src/test/resources/common/message/SimpleExampleMessage.json
index 342a9b994a8..9b9c049593a 100644
--- a/clients/src/test/resources/common/message/SimpleExampleMessage.json
+++ b/clients/src/test/resources/common/message/SimpleExampleMessage.json
@@ -50,9 +50,10 @@
"fields": [
{ "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
]},
- { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
- { "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
- { "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" }
+ { "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
+ { "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
+ { "name": "myUint16", "type": "uint16", "versions": "1+", "default": "33000" },
+ { "name": "myUint32", "type": "uint32", "versions": "1+", "default": "1234567" }
],
"commonStructs": [
{ "name": "TestCommonStruct", "versions": "0+", "fields": [
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
index d15b03cdb95..1853458ee97 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
@@ -300,6 +300,7 @@ public final class FieldSpec {
} else if ((type instanceof FieldType.Int8FieldType) ||
(type instanceof FieldType.Int16FieldType) ||
(type instanceof FieldType.Uint16FieldType) ||
+ (type instanceof FieldType.Uint32FieldType) ||
(type instanceof FieldType.Int32FieldType) ||
(type instanceof FieldType.Int64FieldType)) {
int base = 10;
@@ -338,7 +339,7 @@ public final class FieldSpec {
} else {
try {
int value = Integer.valueOf(defaultString, base);
- if (value < 0 || value > 65535) {
+ if (value < 0 || value > MessageGenerator.UNSIGNED_SHORT_MAX) {
throw new RuntimeException("Invalid default for uint16 field " +
name + ": out of range.");
}
@@ -348,6 +349,22 @@ public final class FieldSpec {
}
return fieldDefault;
}
+ } else if (type instanceof FieldType.Uint32FieldType) {
+ if (defaultString.isEmpty()) {
+ return "0";
+ } else {
+ try {
+ long value = Long.valueOf(defaultString, base);
+ if (value < 0 || value > MessageGenerator.UNSIGNED_INT_MAX) {
+ throw new RuntimeException("Invalid default for uint32 field " +
+ name + ": out of range.");
+ }
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid default for uint32 field " +
+ name + ": " + defaultString, e);
+ }
+ return fieldDefault;
+ }
} else if (type instanceof FieldType.Int32FieldType) {
if (defaultString.isEmpty()) {
return "0";
@@ -476,6 +493,8 @@ public final class FieldSpec {
return "short";
} else if (type instanceof FieldType.Uint16FieldType) {
return "int";
+ } else if (type instanceof FieldType.Uint32FieldType) {
+ return "long";
} else if (type instanceof FieldType.Int32FieldType) {
return "int";
} else if (type instanceof FieldType.Int64FieldType) {
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java
index e0009c22eaf..24b79d47cba 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldType.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java
@@ -122,6 +122,26 @@ public interface FieldType {
}
}
+ final class Uint32FieldType implements FieldType {
+ static final Uint32FieldType INSTANCE = new Uint32FieldType();
+ private static final String NAME = "uint32";
+
+ @Override
+ public String getBoxedJavaType(HeaderGenerator headerGenerator) {
+ return "Long";
+ }
+
+ @Override
+ public Optional fixedLength() {
+ return Optional.of(4);
+ }
+
+ @Override
+ public String toString() {
+ return NAME;
+ }
+ }
+
final class Int64FieldType implements FieldType {
static final Int64FieldType INSTANCE = new Int64FieldType();
private static final String NAME = "int64";
@@ -369,6 +389,8 @@ public interface FieldType {
return Int16FieldType.INSTANCE;
case Uint16FieldType.NAME:
return Uint16FieldType.INSTANCE;
+ case Uint32FieldType.NAME:
+ return Uint32FieldType.INSTANCE;
case Int32FieldType.NAME:
return Int32FieldType.INSTANCE;
case Int64FieldType.NAME:
diff --git a/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java b/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
index 2df8170b053..8ce07b9275d 100644
--- a/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
@@ -164,6 +164,11 @@ public final class JsonConverterGenerator implements MessageClassGenerator {
buffer.printf("%s;%n", target.assignmentStatement(
String.format("MessageUtil.jsonNodeToUnsignedShort(%s, \"%s\")",
target.sourceVariable(), target.humanReadableName())));
+ } else if (target.field().type() instanceof FieldType.Uint32FieldType) {
+ headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
+ buffer.printf("%s;%n", target.assignmentStatement(
+ String.format("MessageUtil.jsonNodeToUnsignedInt(%s, \"%s\")",
+ target.sourceVariable(), target.humanReadableName())));
} else if (target.field().type() instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
@@ -346,7 +351,8 @@ public final class JsonConverterGenerator implements MessageClassGenerator {
headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new IntNode(%s)", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.Int64FieldType) {
+ } else if (target.field().type() instanceof FieldType.Int64FieldType ||
+ (target.field().type() instanceof FieldType.Uint32FieldType)) {
headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
buffer.printf("%s;%n", target.assignmentStatement(
String.format("new LongNode(%s)", target.sourceVariable())));
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index b9923ee5723..235667480cc 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -541,6 +541,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
return "_readable.readShort()";
} else if (type instanceof FieldType.Uint16FieldType) {
return "_readable.readUnsignedShort()";
+ } else if (type instanceof FieldType.Uint32FieldType) {
+ return "_readable.readUnsignedInt()";
} else if (type instanceof FieldType.Int32FieldType) {
return "_readable.readInt()";
} else if (type instanceof FieldType.Int64FieldType) {
@@ -848,6 +850,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
return String.format("_writable.writeShort(%s)", name);
} else if (type instanceof FieldType.Uint16FieldType) {
return String.format("_writable.writeUnsignedShort(%s)", name);
+ } else if (type instanceof FieldType.Uint32FieldType) {
+ return String.format("_writable.writeUnsignedInt(%s)", name);
} else if (type instanceof FieldType.Int32FieldType) {
return String.format("_writable.writeInt(%s)", name);
} else if (type instanceof FieldType.Int64FieldType) {
@@ -1372,7 +1376,8 @@ public final class MessageDataGenerator implements MessageClassGenerator {
(field.type() instanceof FieldType.Int32FieldType)) {
buffer.printf("hashCode = 31 * hashCode + %s;%n",
field.camelCaseName());
- } else if (field.type() instanceof FieldType.Int64FieldType) {
+ } else if (field.type() instanceof FieldType.Int64FieldType ||
+ (field.type() instanceof FieldType.Uint32FieldType)) {
buffer.printf("hashCode = 31 * hashCode + ((int) (%s >> 32) ^ (int) %s);%n",
field.camelCaseName(), field.camelCaseName());
} else if (field.type() instanceof FieldType.UUIDFieldType) {
@@ -1427,6 +1432,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
(field.type() instanceof FieldType.Int8FieldType) ||
(field.type() instanceof FieldType.Int16FieldType) ||
(field.type() instanceof FieldType.Uint16FieldType) ||
+ (field.type() instanceof FieldType.Uint32FieldType) ||
(field.type() instanceof FieldType.Int32FieldType) ||
(field.type() instanceof FieldType.Int64FieldType) ||
(field.type() instanceof FieldType.Float64FieldType) ||
@@ -1514,6 +1520,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
} else if ((field.type() instanceof FieldType.Int8FieldType) ||
(field.type() instanceof FieldType.Int16FieldType) ||
(field.type() instanceof FieldType.Uint16FieldType) ||
+ (field.type() instanceof FieldType.Uint32FieldType) ||
(field.type() instanceof FieldType.Int32FieldType) ||
(field.type() instanceof FieldType.Int64FieldType) ||
(field.type() instanceof FieldType.Float64FieldType)) {
@@ -1576,13 +1583,21 @@ public final class MessageDataGenerator implements MessageClassGenerator {
field.fieldAbstractJavaType(headerGenerator, structRegistry));
buffer.incrementIndent();
if (field.type() instanceof FieldType.Uint16FieldType) {
- buffer.printf("if (v < 0 || v > 65535) {%n");
+ buffer.printf("if (v < 0 || v > %d) {%n", MessageGenerator.UNSIGNED_SHORT_MAX);
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"Invalid value \" + v + " +
"\" for unsigned short field.\");%n");
buffer.decrementIndent();
buffer.printf("}%n");
}
+ if (field.type() instanceof FieldType.Uint32FieldType) {
+ buffer.printf("if (v < 0 || v > %dL) {%n", MessageGenerator.UNSIGNED_INT_MAX);
+ buffer.incrementIndent();
+ buffer.printf("throw new RuntimeException(\"Invalid value \" + v + " +
+ "\" for unsigned int field.\");%n");
+ buffer.decrementIndent();
+ buffer.printf("}%n");
+ }
buffer.printf("this.%s = v;%n", field.camelCaseName());
buffer.printf("return this;%n");
buffer.decrementIndent();
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index cfbeae84ecf..56f3f6ab0b2 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -156,6 +156,10 @@ public final class MessageGenerator {
static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";
+ static final long UNSIGNED_INT_MAX = 4294967295L;
+
+ static final int UNSIGNED_SHORT_MAX = 65535;
+
/**
* The Jackson serializer we use for JSON objects.
*/
diff --git a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
index 5ebd158839b..a5ae8300d53 100644
--- a/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java
@@ -250,6 +250,12 @@ final class SchemaGenerator {
throw new RuntimeException("Type " + type + " cannot be nullable.");
}
return "Type.UINT16";
+ } else if (type instanceof FieldType.Uint32FieldType) {
+ headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
+ if (nullable) {
+ throw new RuntimeException("Type " + type + " cannot be nullable.");
+ }
+ return "Type.UNSIGNED_INT32";
} else if (type instanceof FieldType.Int32FieldType) {
headerGenerator.addImport(MessageGenerator.TYPE_CLASS);
if (nullable) {
diff --git a/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java b/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
index 07766f23f51..8eb38e999c7 100644
--- a/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
+++ b/generator/src/test/java/org/apache/kafka/message/MessageGeneratorTest.java
@@ -67,4 +67,10 @@ public class MessageGeneratorTest {
} catch (RuntimeException e) {
}
}
+
+ @Test
+ public void testConstants() {
+ assertEquals(MessageGenerator.UNSIGNED_SHORT_MAX, 0xFFFF);
+ assertEquals(MessageGenerator.UNSIGNED_INT_MAX, 0xFFFFFFFFL);
+ }
}