KAFKA-14834: [10/N] Reserve repartition topic formats to include isLatest (#13566)

KIP-914 introduced a new boolean isLatest into Change to indicate whether a change update represents the latest for the key. Even though Change is serialized into the table repartition topic, the new boolean does not need to be serialized in, because the table repartition map processor performs an optimization to drop records for which isLatest = false. If not for this optimization, the downstream table aggregate would have to drop such records instead, and isLatest would need to be serialized into the repartition topic.

In light of the possibility that isLatest may need to be serialized into the repartition topic in the future, e.g., if other downstream processors are added which need to distinguish between records for which isLatest = true vs isLatest = false, this PR reserves repartition topic formats which include isLatest. Reserving these formats now comes at no additional cost to users since a rolling bounce is already required for the upcoming release due to KIP-904. If we don't reserve them now and instead have to add them later, then another bounce would be required at that time. Reserving formats is cheap, so we choose to do it now.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-14 00:56:36 -04:00 committed by GitHub
parent a87edf13d5
commit f1eb260fea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 6 deletions

View File

@ -1026,7 +1026,7 @@ KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
<li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
input record that was just received. The subtractor is guaranteed to be called before the adder if the extracted grouping key of the old and new value is the same.
The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
and adder is not defined.</li>
<li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g., DELETE),
@ -1276,7 +1276,7 @@ KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
<li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
input record that was just received. The subtractor is guaranteed be called before the adder if the extracted grouping key of the old and new value is the same.
The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
and adder is not defined.</li>
<li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g., DELETE),

View File

@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {
private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int IS_LATEST_FLAG_SIZE = 1;
private Deserializer<T> inner;
@ -52,21 +53,27 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);
final byte[] newData;
final byte[] oldData;
final boolean isLatest;
if (newOldFlag == (byte) 0) {
newData = null;
final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = true;
} else if (newOldFlag == (byte) 1) {
oldData = null;
final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = true;
} else if (newOldFlag == (byte) 2) {
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
newData = new byte[newDataLength];
@ -76,13 +83,48 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>, Wrapping
buffer.get(newData);
buffer.get(oldData);
isLatest = true;
} else if (newOldFlag == (byte) 3) {
newData = null;
final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = readIsLatestFlag(buffer);
} else if (newOldFlag == (byte) 4) {
oldData = null;
final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = readIsLatestFlag(buffer);
} else if (newOldFlag == (byte) 5) {
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
newData = new byte[newDataLength];
final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(newData);
buffer.get(oldData);
isLatest = readIsLatestFlag(buffer);
} else {
throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer.");
}
return new Change<>(
inner.deserialize(topic, headers, newData),
inner.deserialize(topic, headers, oldData));
inner.deserialize(topic, headers, newData),
inner.deserialize(topic, headers, oldData),
isLatest);
}
private boolean readIsLatestFlag(final ByteBuffer buffer) {
final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE);
if (isLatestFlag == (byte) 1) {
return true;
} else if (isLatestFlag == (byte) 0) {
return false;
} else {
throw new StreamsException("Encountered unexpected byte value `" + isLatestFlag + "` for isLatestFlag in ChangedDeserializer.");
}
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.junit.Assert;
@ -36,12 +37,16 @@ import static org.junit.Assert.assertThrows;
public class ChangedSerdeTest {
private static final String TOPIC = "some-topic";
private static final Serializer<String> STRING_SERIALIZER = Serdes.String().serializer();
private static final ChangedSerializer<String> CHANGED_STRING_SERIALIZER =
new ChangedSerializer<>(Serdes.String().serializer());
new ChangedSerializer<>(STRING_SERIALIZER);
private static final ChangedDeserializer<String> CHANGED_STRING_DESERIALIZER =
new ChangedDeserializer<>(Serdes.String().deserializer());
private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int IS_LATEST_FLAG_SIZE = 1;
private static final int UINT32_SIZE = 4;
final String nonNullNewValue = "hello";
final String nonNullOldValue = "world";
@ -106,4 +111,65 @@ public class ChangedSerdeTest {
StreamsException.class,
() -> CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized));
}
@Test
public void shouldDeserializeReservedVersions3Through5() {
// `isLatest = true`
checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, true));
checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, true));
checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, true));
// `isLatest = false`
checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, null, false));
checkRoundTripForReservedVersion(new Change<>(null, nonNullOldValue, false));
checkRoundTripForReservedVersion(new Change<>(nonNullNewValue, nonNullOldValue, false));
}
// versions 3 through 5 are reserved in the deserializer in case we want to use them in the
// future (in which case we save users from needing to perform another rolling upgrade by
// introducing these reserved versions in the same AK release as version 2).
// so, this serialization code is not actually in the serializer itself, but only here for
// now for purposes of testing the deserializer.
private static byte[] serializeVersions3Through5(final String topic, final Change<String> data) {
final boolean oldValueIsNotNull = data.oldValue != null;
final boolean newValueIsNotNull = data.newValue != null;
final byte[] newData = STRING_SERIALIZER.serialize(topic, null, data.newValue);
final byte[] oldData = STRING_SERIALIZER.serialize(topic, null, data.oldValue);
final int newDataLength = newValueIsNotNull ? newData.length : 0;
final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
// The serialization format is:
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
final ByteBuffer buf;
final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
if (newValueIsNotNull && oldValueIsNotNull) {
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
ByteUtils.writeUnsignedInt(buf, newDataLength);
buf.put(newData).put(oldData).put(isLatest).put((byte) 5);
} else if (newValueIsNotNull) {
final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(newData).put(isLatest).put((byte) 4);
} else if (oldValueIsNotNull) {
final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(oldData).put(isLatest).put((byte) 3);
} else {
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
}
return buf.array();
}
private static void checkRoundTripForReservedVersion(final Change<String> data) {
final byte[] serialized = serializeVersions3Through5(TOPIC, data);
assertThat(serialized, is(notNullValue()));
final Change<String> deserialized = CHANGED_STRING_DESERIALIZER.deserialize(TOPIC, serialized);
assertThat(deserialized, is(data));
}
}