From f1eb260fea75de9b861a35678e8c8233fb948e1a Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Fri, 14 Apr 2023 00:56:36 -0400 Subject: [PATCH] KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566) KIP-914 introduced a new boolean isLatest into Change to indicate whether a change update represents the latest for the key. Even though Change is serialized into the table repartition topic, the new boolean does not need to be serialized in, because the table repartition map processor performs an optimization to drop records for which isLatest = false. If not for this optimization, the downstream table aggregate would have to drop such records instead, and isLatest would need to be serialized into the repartition topic. In light of the possibility that isLatest may need to be serialized into the repartition topic in the future, e.g., if other downstream processors are added which need to distinguish between records for which isLatest = true vs isLatest = false, this PR reserves repartition topic formats which include isLatest. Reserving these formats now comes at no additional cost to users since a rolling bounce is already required for the upcoming release due to KIP-904. If we don't reserve them now and instead have to add them later, then another bounce would be required at that time. Reserving formats is cheap, so we choose to do it now. Reviewers: Matthias J. Sax --- docs/streams/developer-guide/dsl-api.html | 4 +- .../internals/ChangedDeserializer.java | 46 +++++++++++- .../kstream/internals/ChangedSerdeTest.java | 70 ++++++++++++++++++- 3 files changed, 114 insertions(+), 6 deletions(-) diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index adf0f33bb4b..5fe7af249d3 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -1026,7 +1026,7 @@ KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the - input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same. + input record that was just received. The subtractor is guaranteed to be called before the adder if the extracted grouping key of the old and new value is the same. The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), @@ -1276,7 +1276,7 @@ KTable<String, Long> aggregatedTable = groupedTable.reduce(
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the - input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same. + input record that was just received. The subtractor is guaranteed be called before the adder if the extracted grouping key of the old and new value is the same. The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor and adder is not defined.
  • When a tombstone record – i.e. a record with a null value – is received for a key (e.g., DELETE), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index de5a1e33b89..b5b2c6aa3c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; public class ChangedDeserializer implements Deserializer>, WrappingNullableDeserializer, Void, T> { private static final int NEW_OLD_FLAG_SIZE = 1; + private static final int IS_LATEST_FLAG_SIZE = 1; private Deserializer inner; @@ -52,21 +53,27 @@ public class ChangedDeserializer implements Deserializer>, Wrapping // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} // {BYTE_ARRAY newValue}{BYTE newOldFlag=1} // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} + // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} + // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} + // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} final ByteBuffer buffer = ByteBuffer.wrap(data); final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE); final byte[] newData; final byte[] oldData; + final boolean isLatest; if (newOldFlag == (byte) 0) { newData = null; final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE; oldData = new byte[oldDataLength]; buffer.get(oldData); + isLatest = true; } else if (newOldFlag == (byte) 1) { oldData = null; final int newDataLength = data.length - NEW_OLD_FLAG_SIZE; newData = new byte[newDataLength]; buffer.get(newData); + isLatest = true; } else if (newOldFlag == (byte) 2) { final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer)); newData = new byte[newDataLength]; @@ -76,13 +83,48 @@ public class ChangedDeserializer implements Deserializer>, Wrapping buffer.get(newData); buffer.get(oldData); + isLatest = true; + } else if (newOldFlag == (byte) 3) { + newData = null; + final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; + oldData = new byte[oldDataLength]; + buffer.get(oldData); + isLatest = readIsLatestFlag(buffer); + } else if (newOldFlag == (byte) 4) { + oldData = null; + final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; + newData = new byte[newDataLength]; + buffer.get(newData); + isLatest = readIsLatestFlag(buffer); + } else if (newOldFlag == (byte) 5) { + final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer)); + newData = new byte[newDataLength]; + + final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE; + oldData = new byte[oldDataLength]; + + buffer.get(newData); + buffer.get(oldData); + isLatest = readIsLatestFlag(buffer); } else { throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer."); } return new Change<>( - inner.deserialize(topic, headers, newData), - inner.deserialize(topic, headers, oldData)); + inner.deserialize(topic, headers, newData), + inner.deserialize(topic, headers, oldData), + isLatest); + } + + private boolean readIsLatestFlag(final ByteBuffer buffer) { + final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE); + if (isLatestFlag == (byte) 1) { + return true; + } else if (isLatestFlag == (byte) 0) { + return false; + } else { + throw new StreamsException("Encountered unexpected byte value `" + isLatestFlag + "` for isLatestFlag in ChangedDeserializer."); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java index 21dbdb950df..475aa53ef0b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.junit.Assert; @@ -36,12 +37,16 @@ import static org.junit.Assert.assertThrows; public class ChangedSerdeTest { private static final String TOPIC = "some-topic"; + private static final Serializer STRING_SERIALIZER = Serdes.String().serializer(); private static final ChangedSerializer CHANGED_STRING_SERIALIZER = - new ChangedSerializer<>(Serdes.String().serializer()); - + new ChangedSerializer<>(STRING_SERIALIZER); private static final ChangedDeserializer CHANGED_STRING_DESERIALIZER = new ChangedDeserializer<>(Serdes.String().deserializer()); + private static final int NEW_OLD_FLAG_SIZE = 1; + private static final int IS_LATEST_FLAG_SIZE = 1; + private static final int UINT32_SIZE = 4; + final String nonNullNewValue = "hello"; final String nonNullOldValue = "world"; @@ -106,4 +111,65 @@ public class ChangedSerdeTest { StreamsException.class, () -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized)); } + + @Test + public void shouldDeserializeReservedVersions3Through5() { + // `isLatest = true` + checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, true)); + checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, true)); + checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, true)); + + // `isLatest = false` + checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, false)); + checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, false)); + checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, false)); + } + + // versions 3 through 5 are reserved in the deserializer in case we want to use them in the + // future (in which case we save users from needing to perform another rolling upgrade by + // introducing these reserved versions in the same AK release as version 2). + // so, this serialization code is not actually in the serializer itself, but only here for + // now for purposes of testing the deserializer. + private static byte[] serializeVersions3Through5(final String topic, final Change data) { + final boolean oldValueIsNotNull = data.oldValue != null; + final boolean newValueIsNotNull = data.newValue != null; + + final byte[] newData = STRING_SERIALIZER.serialize(topic, null, data.newValue); + final byte[] oldData = STRING_SERIALIZER.serialize(topic, null, data.oldValue); + + final int newDataLength = newValueIsNotNull ? newData.length : 0; + final int oldDataLength = oldValueIsNotNull ? oldData.length : 0; + + // The serialization format is: + // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} + // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} + // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} + final ByteBuffer buf; + final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0; + if (newValueIsNotNull && oldValueIsNotNull) { + final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + buf = ByteBuffer.allocate(capacity); + ByteUtils.writeUnsignedInt(buf, newDataLength); + buf.put(newData).put(oldData).put(isLatest).put((byte) 5); + } else if (newValueIsNotNull) { + final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + buf = ByteBuffer.allocate(capacity); + buf.put(newData).put(isLatest).put((byte) 4); + } else if (oldValueIsNotNull) { + final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; + buf = ByteBuffer.allocate(capacity); + buf.put(oldData).put(isLatest).put((byte) 3); + } else { + throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); + } + + return buf.array(); + } + + private static void checkRoundTripForReservedVersion(final Change data) { + final byte[] serialized = serializeVersions3Through5(TOPIC, data); + assertThat(serialized, is(notNullValue())); + final Change deserialized = CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized); + assertThat(deserialized, is(data)); + } }