diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
index 663ef4709a7..e1eae68e2ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -25,6 +26,9 @@ import java.nio.ByteBuffer;
/**
* This classes exposes low-level methods for reading/writing from byte streams or buffers.
+ *
+ * The implementation of these methods has been tuned for JVM and the empirical calculations could be found
+ * using ByteUtilsBenchmark.java
*/
public final class ByteUtils {
@@ -144,47 +148,84 @@ public final class ByteUtils {
* Read an integer stored in variable-length format using unsigned decoding from
* Google Protocol Buffers.
*
+ * The implementation is based on Netty's decoding of varint.
+ * @see Netty's varint decoding
+ *
* @param buffer The buffer to read from
* @return The integer read
*
* @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
*/
public static int readUnsignedVarint(ByteBuffer buffer) {
- int value = 0;
- int i = 0;
- int b;
- while (((b = buffer.get()) & 0x80) != 0) {
- value |= (b & 0x7f) << i;
- i += 7;
- if (i > 28)
- throw illegalVarintException(value);
+ byte tmp = buffer.get();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ int result = tmp & 127;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 127) << 7;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 127) << 14;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 127) << 21;
+ result |= (tmp = buffer.get()) << 28;
+ if (tmp < 0) {
+ throw illegalVarintException(result);
+ }
+ }
+ }
+ }
+ return result;
}
- value |= b << i;
- return value;
}
/**
* Read an integer stored in variable-length format using unsigned decoding from
* Google Protocol Buffers.
*
+ * The implementation is based on Netty's decoding of varint.
+ * @see Netty's varint decoding
+ *
* @param in The input to read from
* @return The integer read
*
* @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
* @throws IOException if {@link DataInput} throws {@link IOException}
+ * @throws EOFException if {@link DataInput} throws {@link EOFException}
*/
- public static int readUnsignedVarint(DataInput in) throws IOException {
- int value = 0;
- int i = 0;
- int b;
- while (((b = in.readByte()) & 0x80) != 0) {
- value |= (b & 0x7f) << i;
- i += 7;
- if (i > 28)
- throw illegalVarintException(value);
+ static int readUnsignedVarint(DataInput in) throws IOException {
+ byte tmp = in.readByte();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ int result = tmp & 127;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 127) << 7;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 127) << 14;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 127) << 21;
+ result |= (tmp = in.readByte()) << 28;
+ if (tmp < 0) {
+ throw illegalVarintException(result);
+ }
+ }
+ }
+ }
+ return result;
}
- value |= b << i;
- return value;
}
/**
@@ -250,6 +291,12 @@ public final class ByteUtils {
* @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read
*/
public static long readVarlong(ByteBuffer buffer) {
+ long raw = readUnsignedVarlong(buffer);
+ return (raw >>> 1) ^ -(raw & 1);
+ }
+
+ // visible for testing
+ static long readUnsignedVarlong(ByteBuffer buffer) {
long value = 0L;
int i = 0;
long b;
@@ -260,7 +307,7 @@ public final class ByteUtils {
throw illegalVarlongException(value);
}
value |= b << i;
- return (value >>> 1) ^ -(value & 1);
+ return value;
}
/**
@@ -288,33 +335,68 @@ public final class ByteUtils {
* Google Protocol Buffers
* into the buffer.
*
+ * Implementation copied from https://github.com/astei/varint-writing-showdown/tree/dev (MIT License)
+ * @see Sample implementation
+ *
* @param value The value to write
* @param buffer The output to write to
*/
public static void writeUnsignedVarint(int value, ByteBuffer buffer) {
- while ((value & 0xffffff80) != 0L) {
- byte b = (byte) ((value & 0x7f) | 0x80);
- buffer.put(b);
- value >>>= 7;
+ if ((value & (0xFFFFFFFF << 7)) == 0) {
+ buffer.put((byte) value);
+ } else {
+ buffer.put((byte) (value & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 14)) == 0) {
+ buffer.put((byte) ((value >>> 7) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 7) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 21)) == 0) {
+ buffer.put((byte) ((value >>> 14) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 14) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 28)) == 0) {
+ buffer.put((byte) ((value >>> 21) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 21) & 0x7F | 0x80));
+ buffer.put((byte) ((value >>> 28) & 0xFF));
+ }
+ }
+ }
}
- buffer.put((byte) value);
}
/**
* Write the given integer following the variable-length unsigned encoding from
* Google Protocol Buffers
* into the buffer.
+ *
+ * For implementation notes, see {@link #writeUnsignedVarint(int, ByteBuffer)}
*
* @param value The value to write
* @param out The output to write to
*/
public static void writeUnsignedVarint(int value, DataOutput out) throws IOException {
- while ((value & 0xffffff80) != 0L) {
- byte b = (byte) ((value & 0x7f) | 0x80);
- out.writeByte(b);
- value >>>= 7;
+ if ((value & (0xFFFFFFFF << 7)) == 0) {
+ out.writeByte(value);
+ } else {
+ out.writeByte(value & 0x7F | 0x80);
+ if ((value & (0xFFFFFFFF << 14)) == 0) {
+ out.writeByte(value >>> 7);
+ } else {
+ out.writeByte((value >>> 7) & 0x7F | 0x80);
+ if ((value & (0xFFFFFFFF << 21)) == 0) {
+ out.writeByte(value >>> 14);
+ } else {
+ out.writeByte((byte) ((value >>> 14) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 28)) == 0) {
+ out.writeByte(value >>> 21);
+ } else {
+ out.writeByte((value >>> 21) & 0x7F | 0x80);
+ out.writeByte(value >>> 28);
+ }
+ }
+ }
}
- out.writeByte((byte) value);
}
/**
@@ -368,6 +450,11 @@ public final class ByteUtils {
*/
public static void writeVarlong(long value, ByteBuffer buffer) {
long v = (value << 1) ^ (value >> 63);
+ writeUnsignedVarlong(v, buffer);
+ }
+
+ // visible for testing and benchmarking
+ public static void writeUnsignedVarlong(long v, ByteBuffer buffer) {
while ((v & 0xffffffffffffff80L) != 0L) {
byte b = (byte) ((v & 0x7f) | 0x80);
buffer.put(b);
@@ -437,8 +524,11 @@ public final class ByteUtils {
* @see #sizeOfUnsignedVarint(int)
*/
public static int sizeOfVarlong(long value) {
- long v = (value << 1) ^ (value >> 63);
+ return sizeOfUnsignedVarlong((value << 1) ^ (value >> 63));
+ }
+ // visible for benchmarking
+ public static int sizeOfUnsignedVarlong(long v) {
// For implementation notes @see #sizeOfUnsignedVarint(int)
// Similar logic is applied to allow for 64bit input -> 1-9byte output.
// return (70 - leadingZeros) / 7 + leadingZeros / 64;
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
index 6f3b6904feb..85c263a7625 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
@@ -24,6 +25,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@@ -32,6 +34,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ByteUtilsTest {
+ private static final int MAX_LENGTH_VARINT = 5;
+ private static final int MAX_LENGTH_VARLONG = 10;
private final byte x00 = 0x00;
private final byte x01 = 0x01;
private final byte x02 = 0x02;
@@ -241,6 +245,121 @@ public class ByteUtilsTest {
assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
}
+ @Test
+ @Disabled // Enable this when we change the implementation of UnsignedVarlong
+ public void testCorrectnessWriteUnsignedVarlong() {
+ // The old well-known implementation for writeVarlong.
+ LongFunction simpleImplementation = (long value) -> {
+ ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+ while ((value & 0xffffffffffffff80L) != 0L) {
+ byte b = (byte) ((value & 0x7f) | 0x80);
+ buffer.put(b);
+ value >>>= 7;
+ }
+ buffer.put((byte) value);
+
+ return buffer;
+ };
+
+ // compare the full range of values
+ final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+ for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+ ByteUtils.writeUnsignedVarlong(i, actual);
+ final ByteBuffer expected = simpleImplementation.apply(i);
+ assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for number=" + i);
+ actual.clear();
+ }
+ }
+
+ @Test
+ public void testCorrectnessWriteUnsignedVarint() {
+ // The old well-known implementation for writeUnsignedVarint.
+ IntFunction simpleImplementation = (int value) -> {
+ ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ buffer.put((byte) value);
+ break;
+ } else {
+ buffer.put((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ }
+
+ return buffer;
+ };
+
+ // compare the full range of values
+ final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+ for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ ByteUtils.writeUnsignedVarint(i, actual);
+ final ByteBuffer expected = simpleImplementation.apply(i);
+ assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for integer=" + i);
+ actual.clear();
+ }
+ }
+
+ @Test
+ public void testCorrectnessReadUnsignedVarint() {
+ // The old well-known implementation for readUnsignedVarint
+ Function simpleImplementation = (ByteBuffer buffer) -> {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw new IllegalArgumentException("Invalid varint");
+ }
+ value |= b << i;
+ return value;
+ };
+
+ // compare the full range of values
+ final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+ for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ ByteUtils.writeUnsignedVarint(i, testData);
+ // prepare buffer for reading
+ testData.flip();
+ final int actual = ByteUtils.readUnsignedVarint(testData.duplicate());
+ final int expected = simpleImplementation.apply(testData);
+ assertEquals(expected, actual);
+ testData.clear();
+ }
+ }
+
+ @Test
+ @Disabled // Enable this when we change the implementation of UnsignedVarlong
+ public void testCorrectnessReadUnsignedVarlong() {
+ // The old well-known implementation for readUnsignedVarlong
+ Function simpleImplementation = (ByteBuffer buffer) -> {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw new IllegalArgumentException();
+ }
+ value |= b << i;
+ return value;
+ };
+
+ // compare the full range of values
+ final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+ for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+ ByteUtils.writeUnsignedVarlong(i, testData);
+ // prepare buffer for reading
+ testData.flip();
+ final long actual = ByteUtils.readUnsignedVarlong(testData.duplicate());
+ final long expected = simpleImplementation.apply(testData);
+ assertEquals(expected, actual);
+ testData.clear();
+ }
+ }
+
@Test
public void testSizeOfUnsignedVarint() {
// The old well-known implementation for sizeOfUnsignedVarint
@@ -298,7 +417,7 @@ public class ByteUtilsTest {
}
private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException {
- ByteBuffer buf = ByteBuffer.allocate(32);
+ ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARINT);
ByteUtils.writeUnsignedVarint(value, buf);
buf.flip();
assertArrayEquals(expectedEncoding, Utils.toArray(buf));
@@ -314,7 +433,7 @@ public class ByteUtilsTest {
}
private void assertVarintSerde(int value, byte[] expectedEncoding) throws IOException {
- ByteBuffer buf = ByteBuffer.allocate(32);
+ ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARINT);
ByteUtils.writeVarint(value, buf);
buf.flip();
assertArrayEquals(expectedEncoding, Utils.toArray(buf));
@@ -330,7 +449,7 @@ public class ByteUtilsTest {
}
private void assertVarlongSerde(long value, byte[] expectedEncoding) throws IOException {
- ByteBuffer buf = ByteBuffer.allocate(32);
+ ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
ByteUtils.writeVarlong(value, buf);
buf.flip();
assertEquals(value, ByteUtils.readVarlong(buf.duplicate()));
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index a659240da41..e231696a3c2 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -523,4 +523,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+
+
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
index bee8c16a260..29b99d542c4 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
@@ -17,68 +17,270 @@
package org.apache.kafka.jmh.util;
-import java.util.concurrent.ThreadLocalRandom;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.ByteUtils;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
-@State(Scope.Benchmark)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(3)
-@Warmup(iterations = 5, time = 1)
-@Measurement(iterations = 10, time = 1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+/**
+ * This benchmark calculates the empirical evidence of different implementation for encoding/decoding a protobuf
+ * VarInt and VarLong.
+ *
+ * The benchmark uses JMH and calculates results for different sizes of variable length integer. We expect most of the
+ * usage in Kafka code base to be 1 or 2 byte integers.
+ */
public class ByteUtilsBenchmark {
- private int inputInt;
- private long inputLong;
- @Setup(Level.Iteration)
- public void setUp() {
- inputInt = ThreadLocalRandom.current().nextInt();
- inputLong = ThreadLocalRandom.current().nextLong();
- }
+ private static final int DATA_SET_SAMPLE_SIZE = 16384;
- @Benchmark
- public int testSizeOfUnsignedVarint() {
- return ByteUtils.sizeOfUnsignedVarint(inputInt);
- }
-
- @Benchmark
- public int testSizeOfUnsignedVarintSimple() {
- int value = inputInt;
- int bytes = 1;
- while ((value & 0xffffff80) != 0L) {
- bytes += 1;
- value >>>= 7;
+ @State(Scope.Benchmark)
+ public static class BaseBenchmarkState {
+ private ByteBuffer testBuffer;
+ private SecureRandom random;
+ @Setup(Level.Trial)
+ public void setUpBenchmarkLevel() {
+ // Initialize the random number generator with a seed so that for each benchmark it produces the same sequence
+ // of random numbers. Note that it is important to initialize it again with the seed before every benchmark.
+ random = new SecureRandom();
+ random.setSeed(133713371337L);
}
- return bytes;
- }
- @Benchmark
- public int testSizeOfVarlong() {
- return ByteUtils.sizeOfVarlong(inputLong);
- }
-
- @Benchmark
- public int testSizeOfVarlongSimple() {
- long v = (inputLong << 1) ^ (inputLong >> 63);
- int bytes = 1;
- while ((v & 0xffffffffffffff80L) != 0L) {
- bytes += 1;
- v >>>= 7;
+ @Setup(Level.Invocation)
+ public void setUpInvocationBuffer() {
+ testBuffer = ByteBuffer.allocate(10);
+ }
+
+ /**
+ * Generates a random int64 number which occupies exactly bytesSet in the variable length encoding for int64
+ *
+ * Upper bound is set by finding the largest number that can be represented by N bits. This number is found
+ * by bit shifting by N and subtracting decimal 1. For example, for 2 bytes = 16 bits, we calculate the
+ * upper bound by:
+ * 1. 0001 0000 0000 0000 0000 // bit shift by 16 = 65536 decimal
+ * 2. Subtract 1 from 65536 = 65535
+ * 3. 65535 is the upper bound which is represented in binary as 1111 1111 1111 1111 i.e. largest number
+ * that could be represented by 2 bytes.
+ *
+ * Hence, range of random number of different byte length is:
+ * 1 byte - [1, 255)
+ * 2 byte - [256, 65535)
+ * ... and so on.
+ */
+ long generateRandomLongWithExactBytesSet(int bytesSet) {
+ long lowerBound = 1L << ((bytesSet - 1) * 8);
+ long upperBound = (1L << (bytesSet * 8)) - 1;
+ if (lowerBound >= upperBound) {
+ throw new IllegalArgumentException();
+ }
+ return lowerBound +
+ random.longs(lowerBound, upperBound).findFirst()
+ .orElseThrow(() -> new IllegalStateException("Unable to create a random long in the range=[" + lowerBound + ", " + upperBound + "]"));
+ }
+
+ /**
+ * Generates a random int32 number which occupies exactly bytesSet in the variable length encoding for int32
+ *
+ * @see {@link #generateRandomLongWithExactBytesSet(int)} for implementation details.
+ *
+ */
+ int generateRandomIntWithExactBytesSet(int bytesSet) {
+ int lowerBound = 1 << ((bytesSet - 1) * 8);
+ int upperBound = (1 << (bytesSet * 8)) - 1;
+ if (lowerBound >= upperBound) {
+ throw new IllegalArgumentException();
+ }
+ return lowerBound + random.nextInt(upperBound - lowerBound);
+ }
+
+ public ByteBuffer getTestBuffer() {
+ return testBuffer;
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class IterationStateForLong extends BaseBenchmarkState {
+ @Param({"1", "3", "5", "7"})
+ int numNonZeroBytes;
+
+ private long[] randomLongs;
+
+ @Setup(Level.Iteration)
+ public void setup() {
+ randomLongs = new long[DATA_SET_SAMPLE_SIZE];
+ for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+ this.randomLongs[i] = generateRandomLongWithExactBytesSet(numNonZeroBytes);
+ }
+ }
+
+ public long[] getRandomValues() {
+ return randomLongs;
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class IterationStateForInt extends BaseBenchmarkState {
+ @Param({"1", "2", "3"})
+ int numNonZeroBytes;
+ private int[] randomInts;
+
+ @Setup(Level.Iteration)
+ public void setup() {
+ randomInts = new int[DATA_SET_SAMPLE_SIZE];
+ for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+ this.randomInts[i] = generateRandomIntWithExactBytesSet(numNonZeroBytes);
+ }
+ }
+
+ public int[] getRandomValues() {
+ return randomInts;
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedReadVarintNetty(IterationStateForInt state, Blackhole bk) {
+ for (int randomValue : state.getRandomValues()) {
+ ByteUtils.writeUnsignedVarint(randomValue, state.getTestBuffer());
+ // prepare for reading
+ state.getTestBuffer().flip();
+ bk.consume(ByteUtilsBenchmark.readUnsignedVarintNetty(state.getTestBuffer()));
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedReadVarintLegacy(IterationStateForInt state, Blackhole bk) {
+ for (int randomValue : state.getRandomValues()) {
+ ByteUtils.writeUnsignedVarint(randomValue, state.getTestBuffer());
+ // prepare for reading
+ state.getTestBuffer().flip();
+ bk.consume(ByteUtilsBenchmark.readUnsignedVarintLegacy(state.getTestBuffer()));
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedReadVarintProtobuf(IterationStateForInt state, Blackhole bk) {
+ for (int randomValue : state.getRandomValues()) {
+ ByteUtils.writeUnsignedVarint(randomValue, state.getTestBuffer());
+ // prepare for reading
+ state.getTestBuffer().flip();
+ bk.consume(ByteUtilsBenchmark.readUnsignedVarintProtoBuf(state.getTestBuffer()));
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedReadVarlongUnrolled(IterationStateForLong state, Blackhole bk) throws IOException {
+ for (long randomValue : state.getRandomValues()) {
+ ByteUtils.writeUnsignedVarlong(randomValue, state.getTestBuffer());
+ // prepare for reading
+ state.getTestBuffer().flip();
+ bk.consume(ByteUtilsBenchmark.readUnsignedVarlongNetty(state.getTestBuffer()));
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedReadVarlongLegacy(IterationStateForLong state, Blackhole bk) {
+ for (long randomValue : state.getRandomValues()) {
+ ByteUtils.writeUnsignedVarlong(randomValue, state.getTestBuffer());
+ // prepare for reading
+ state.getTestBuffer().flip();
+ bk.consume(ByteUtilsBenchmark.readUnsignedVarlongLegacy(state.getTestBuffer()));
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedWriteVarintUnrolled(IterationStateForInt state) {
+ for (int randomValue : state.getRandomValues()) {
+ ByteUtilsBenchmark.writeUnsignedVarintUnrolled(randomValue, state.getTestBuffer());
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedWriteVarintLegacy(IterationStateForInt state) {
+ for (int randomValue : state.getRandomValues()) {
+ ByteUtilsBenchmark.writeUnsignedVarintLegacy(randomValue, state.getTestBuffer());
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedWriteVarlongUnrolled(IterationStateForLong state) {
+ for (long randomValue : state.getRandomValues()) {
+ ByteUtilsBenchmark.writeUnsignedVarlongUnrolled(randomValue, state.getTestBuffer());
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testUnsignedWriteVarlongLegacy(IterationStateForLong state) {
+ for (long randomValue : state.getRandomValues()) {
+ ByteUtilsBenchmark.writeUnsignedVarlongLegacy(randomValue, state.getTestBuffer());
+ state.getTestBuffer().clear();
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testSizeOfUnsignedVarint(IterationStateForInt state, Blackhole bk) {
+ for (int randomValue : state.getRandomValues()) {
+ bk.consume(ByteUtils.sizeOfUnsignedVarint(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testSizeOfUnsignedVarintSimple(IterationStateForInt state, Blackhole bk) {
+ for (int randomValue : state.getRandomValues()) {
+ int value = randomValue;
+ int bytes = 1;
+ while ((value & 0xffffff80) != 0L) {
+ bytes += 1;
+ value >>>= 7;
+ }
+ bk.consume(bytes);
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testSizeOfUnsignedVarlong(IterationStateForLong state, Blackhole bk) {
+ for (long randomValue : state.getRandomValues()) {
+ bk.consume(ByteUtils.sizeOfUnsignedVarlong(randomValue));
}
- return bytes;
}
public static void main(String[] args) throws RunnerException {
@@ -89,4 +291,293 @@ public class ByteUtilsBenchmark {
new Runner(opt).run();
}
+
+
+ /******************* Implementations **********************/
+
+ /*
+ * Implementation in Trunk as of Apr 2023 / v3.4
+ */
+ private static int readUnsignedVarintLegacy(ByteBuffer buffer) {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 28)
+ throw new IllegalArgumentException();
+ }
+ value |= b << i;
+ return value;
+ }
+
+ /*
+ * Implementation in Trunk as of Apr 2023 / v3.4
+ */
+ private static long readUnsignedVarlongLegacy(ByteBuffer buffer) {
+ long value = 0L;
+ int i = 0;
+ long b;
+ while (((b = buffer.get()) & 0x80) != 0) {
+ value |= (b & 0x7f) << i;
+ i += 7;
+ if (i > 63)
+ throw new IllegalArgumentException();
+ }
+ value |= b << i;
+ return value;
+ }
+
+ /**
+ * Implementation copied from Protobuf's implementation.
+ * see: https://github.com/protocolbuffers/protobuf/blob/f1c7820c9bd0e31f8b7d091092851441ad2716b6/java/core/src/main/java/com/google/protobuf/CodedInputStream.java#L1048
+ */
+ private static int readUnsignedVarintProtoBuf(ByteBuffer buf) {
+ fastpath:
+ {
+ int tempPos = buf.position();
+ int limit = buf.limit();
+
+ if (limit == tempPos) {
+ break fastpath;
+ }
+
+ final byte[] buffer = buf.array();
+ int x;
+ if ((x = buffer[tempPos++]) >= 0) {
+ buf.position(tempPos);
+ return x;
+ } else if (limit - tempPos < 9) {
+ break fastpath;
+ } else if ((x ^= buffer[tempPos++] << 7) < 0) {
+ x ^= ~0 << 7;
+ } else if ((x ^= buffer[tempPos++] << 14) >= 0) {
+ x ^= (~0 << 7) ^ (~0 << 14);
+ } else if ((x ^= buffer[tempPos++] << 21) < 0) {
+ x ^= (~0 << 7) ^ (~0 << 14) ^ (~0 << 21);
+ } else {
+ int y = buffer[tempPos++];
+ x ^= y << 28;
+ x ^= (~0 << 7) ^ (~0 << 14) ^ (~0 << 21) ^ (~0 << 28);
+ if (y < 0
+ && buffer[tempPos++] < 0
+ && buffer[tempPos++] < 0
+ && buffer[tempPos++] < 0
+ && buffer[tempPos++] < 0
+ && buffer[tempPos++] < 0) {
+ break fastpath; // Will throw malformedVarint()
+ }
+ }
+ buf.position(tempPos);
+ return x;
+ }
+ return readUnsignedVarintLegacy(buf);
+ }
+
+ /**
+ * Implementation copied from Netty
+ * see: https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73
+ */
+ private static int readUnsignedVarintNetty(ByteBuffer buffer) {
+ byte tmp = buffer.get();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ int result = tmp & 127;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 127) << 7;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 127) << 14;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 127) << 21;
+ result |= (tmp = buffer.get()) << 28;
+ if (tmp < 0) {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Implementation extended from Int implementation from Netty
+ * see: https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73
+ */
+ private static long readUnsignedVarlongNetty(ByteBuffer buffer) {
+ byte tmp = buffer.get();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ long result = tmp & 0x7f;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= (long) tmp << 28;
+ } else {
+ result |= (long) (tmp & 0x7f) << 28;
+ result = innerUnsignedReadVarLong(buffer, result);
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
+
+ private static long innerUnsignedReadVarLong(ByteBuffer buffer, long result) {
+ byte tmp;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= (long) tmp << 35;
+ } else {
+ result |= (long) (tmp & 0x7f) << 35;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= (long) tmp << 42;
+ } else {
+ result |= (long) (tmp & 0x7f) << 42;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= (long) tmp << 49;
+ } else {
+ result |= (long) (tmp & 0x7f) << 49;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= (long) tmp << 56;
+ } else {
+ result |= (long) (tmp & 0x7f) << 56;
+ result |= (long) (tmp = buffer.get()) << 63;
+ if (tmp < 0) {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /*
+ * Implementation in Trunk as of Apr 2023 / v3.4
+ */
+ private static void writeUnsignedVarintLegacy(int value, ByteBuffer buffer) {
+ while ((value & 0xffffff80) != 0L) {
+ byte b = (byte) ((value & 0x7f) | 0x80);
+ buffer.put(b);
+ value >>>= 7;
+ }
+ buffer.put((byte) value);
+ }
+
+ /*
+ * Implementation in Trunk as of Apr 2023 / v3.4
+ */
+ private static void writeUnsignedVarlongLegacy(long v, ByteBuffer buffer) {
+ while ((v & 0xffffffffffffff80L) != 0L) {
+ byte b = (byte) ((v & 0x7f) | 0x80);
+ buffer.put(b);
+ v >>>= 7;
+ }
+ buffer.put((byte) v);
+ }
+
+ /*
+ * Implementation extended for Long from the Int implementation at https://github.com/astei/varint-writing-showdown/tree/dev (MIT License)
+ * see: https://github.com/astei/varint-writing-showdown/blob/6b1a4baec4b1f0ce65fa40cf0b282ec775fdf43e/src/jmh/java/me/steinborn/varintshowdown/res/SmartNoDataDependencyUnrolledVarIntWriter.java#L8
+ */
+ private static void writeUnsignedVarlongUnrolled(long value, ByteBuffer buffer) {
+ if ((value & (0xFFFFFFFF << 7)) == 0) {
+ buffer.put((byte) value);
+ } else {
+ buffer.put((byte) (value & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 14)) == 0) {
+ buffer.put((byte) (value >>> 7));
+ } else {
+ buffer.put((byte) ((value >>> 7) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 21)) == 0) {
+ buffer.put((byte) (value >>> 14));
+ } else {
+ buffer.put((byte) ((value >>> 14) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 28)) == 0) {
+ buffer.put((byte) (value >>> 21));
+ } else {
+ buffer.put((byte) ((value >>> 21) & 0x7F | 0x80));
+ innerWriteUnsignedVarlongUnrolled(value, buffer);
+ }
+ }
+ }
+ }
+ }
+
+ private static void innerWriteUnsignedVarlongUnrolled(long value, ByteBuffer buffer) {
+ if ((value & (0xFFFFFFFFFFFFFFFFL << 35)) == 0) {
+ buffer.put((byte) (value >>> 28));
+ } else {
+ buffer.put((byte) ((value >>> 28) & 0x7FL | 0x80));
+ if ((value & (0xFFFFFFFFFFFFFFFFL << 42)) == 0) {
+ buffer.put((byte) (value >>> 35));
+ } else {
+ buffer.put((byte) ((value >>> 35) & 0x7FL | 0x80));
+ if ((value & (0xFFFFFFFFFFFFFFFFL << 49)) == 0) {
+ buffer.put((byte) (value >>> 42));
+ } else {
+ buffer.put((byte) ((value >>> 42) & 0x7FL | 0x80));
+ if ((value & (0xFFFFFFFFFFFFFFFFL << 56)) == 0) {
+ buffer.put((byte) (value >>> 49));
+ } else {
+ buffer.put((byte) ((value >>> 49) & 0x7FL | 0x80));
+ if ((value & (0xFFFFFFFFFFFFFFFFL << 63)) == 0) {
+ buffer.put((byte) (value >>> 56));
+ } else {
+ buffer.put((byte) ((value >>> 56) & 0x7FL | 0x80));
+ buffer.put((byte) (value >>> 63));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Implementation copied from https://github.com/astei/varint-writing-showdown/tree/dev (MIT License)
+ * see: https://github.com/astei/varint-writing-showdown/blob/6b1a4baec4b1f0ce65fa40cf0b282ec775fdf43e/src/jmh/java/me/steinborn/varintshowdown/res/SmartNoDataDependencyUnrolledVarIntWriter.java#L8
+ */
+ private static void writeUnsignedVarintUnrolled(int value, ByteBuffer buffer) {
+ if ((value & (0xFFFFFFFF << 7)) == 0) {
+ buffer.put((byte) value);
+ } else {
+ buffer.put((byte) (value & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 14)) == 0) {
+ buffer.put((byte) ((value >>> 7) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 7) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 21)) == 0) {
+ buffer.put((byte) ((value >>> 14) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 14) & 0x7F | 0x80));
+ if ((value & (0xFFFFFFFF << 28)) == 0) {
+ buffer.put((byte) ((value >>> 21) & 0xFF));
+ } else {
+ buffer.put((byte) ((value >>> 21) & 0x7F | 0x80));
+ buffer.put((byte) ((value >>> 28) & 0xFF));
+ }
+ }
+ }
+ }
+ }
}