mirror of https://github.com/apache/kafka.git
KAFKA-13769: Explicitly route FK join results to correct partitions (#11945)
Prior to this commit FK response sink routed FK results to SubscriptionResolverJoinProcessorSupplier using the primary key. There are cases, where this behavior is incorrect. For example, if KTable key serde differs from the data source serde which might happen without a key changing operation. Instead of determining the resolver partition by serializing the PK this patch includes target partition in SubscriptionWrapper payloads. Default FK response-sink partitioner extracts the correct partition from the value and routes the message accordingly. Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
2d2926cf81
commit
adf5cc5371
|
@ -182,7 +182,7 @@
|
|||
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
|
||||
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
|
||||
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
|
||||
|
||||
<suppress checks="StaticVariableName"
|
||||
files="StreamsMetricsImpl.java"/>
|
||||
|
|
|
@ -34,9 +34,9 @@
|
|||
</div>
|
||||
|
||||
<p>
|
||||
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 2.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
|
||||
(possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager
|
||||
rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
|
||||
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.2 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
|
||||
(possible values are <code>"0.10.0" - "3.2"</code>) and during the second you remove it. This is required to safely handle 2 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
|
||||
Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
|
||||
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>:
|
||||
</p>
|
||||
<ul>
|
||||
|
|
|
@ -301,6 +301,54 @@ public class StreamsConfig extends AbstractConfig {
|
|||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_23 = "2.3";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_24 = "2.4";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.5.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_25 = "2.5";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.6.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_26 = "2.6";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.7.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_27 = "2.7";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.8.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_28 = "2.8";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.0.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_30 = "3.0";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.1.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_31 = "3.1";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.2.x}.
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_32 = "3.2";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
|
||||
*/
|
||||
|
@ -632,11 +680,14 @@ public class StreamsConfig extends AbstractConfig {
|
|||
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
|
||||
private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " +
|
||||
"This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " +
|
||||
"When upgrading from 2.4 to a newer version it is not required to specify this config. Default is `null`. " +
|
||||
"When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. " +
|
||||
"Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" +
|
||||
UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" +
|
||||
UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" +
|
||||
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version).";
|
||||
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
|
||||
UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
|
||||
UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
|
||||
UPGRADE_FROM_32 + "\" (for upgrading from the corresponding old version).";
|
||||
|
||||
/** {@code windowstore.changelog.additional.retention.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
|
@ -960,7 +1011,15 @@ public class StreamsConfig extends AbstractConfig {
|
|||
UPGRADE_FROM_20,
|
||||
UPGRADE_FROM_21,
|
||||
UPGRADE_FROM_22,
|
||||
UPGRADE_FROM_23),
|
||||
UPGRADE_FROM_23,
|
||||
UPGRADE_FROM_24,
|
||||
UPGRADE_FROM_25,
|
||||
UPGRADE_FROM_26,
|
||||
UPGRADE_FROM_27,
|
||||
UPGRADE_FROM_28,
|
||||
UPGRADE_FROM_30,
|
||||
UPGRADE_FROM_31,
|
||||
UPGRADE_FROM_32),
|
||||
Importance.LOW,
|
||||
UPGRADE_FROM_DOC)
|
||||
.define(WINDOWED_INNER_CLASS_SERDE,
|
||||
|
|
|
@ -1197,7 +1197,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
|
||||
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
|
||||
tableJoinedInternal.partitioner() == null
|
||||
? null
|
||||
? (topic, key, subscriptionResponseWrapper, numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition()
|
||||
: (topic, key, val, numPartitions) ->
|
||||
tableJoinedInternal.partitioner().partition(topic, key, null, numPartitions);
|
||||
|
||||
|
|
|
@ -108,7 +108,10 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
|
|||
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
|
||||
context().forward(
|
||||
record.withKey(combinedKey.getPrimaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(next.value.value().getHash(), record.value().newValue))
|
||||
.withValue(new SubscriptionResponseWrapper<>(
|
||||
next.value.value().getHash(),
|
||||
record.value().newValue,
|
||||
next.value.value().getPrimaryPartition()))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
|
|||
null :
|
||||
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue));
|
||||
|
||||
final int partition = context().recordMetadata().get().partition();
|
||||
if (record.value().oldValue != null) {
|
||||
final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
|
||||
if (oldForeignKey == null) {
|
||||
|
@ -149,19 +150,34 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
|
|||
//Delete it from the oldKey's state store
|
||||
context().forward(
|
||||
record.withKey(oldForeignKey)
|
||||
.withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, record.key())));
|
||||
.withValue(new SubscriptionWrapper<>(
|
||||
currentHash,
|
||||
DELETE_KEY_NO_PROPAGATE,
|
||||
record.key(),
|
||||
partition
|
||||
)));
|
||||
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
|
||||
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
|
||||
//and LEFT join.
|
||||
}
|
||||
context().forward(
|
||||
record.withKey(newForeignKey)
|
||||
.withValue(new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, record.key())));
|
||||
.withValue(new SubscriptionWrapper<>(
|
||||
currentHash,
|
||||
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
|
||||
record.key(),
|
||||
partition
|
||||
)));
|
||||
} else {
|
||||
//A simple propagatable delete. Delete from the state store and propagate the delete onwards.
|
||||
context().forward(
|
||||
record.withKey(oldForeignKey)
|
||||
.withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, record.key())));
|
||||
.withValue(new SubscriptionWrapper<>(
|
||||
currentHash,
|
||||
DELETE_KEY_AND_PROPAGATE,
|
||||
record.key(),
|
||||
partition
|
||||
)));
|
||||
}
|
||||
} else if (record.value().newValue != null) {
|
||||
//change.oldValue is null, which means it was deleted at least once before, or it is brand new.
|
||||
|
@ -193,7 +209,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
|
|||
} else {
|
||||
context().forward(
|
||||
record.withKey(newForeignKey)
|
||||
.withValue(new SubscriptionWrapper<>(currentHash, instruction, record.key())));
|
||||
.withValue(new SubscriptionWrapper<>(
|
||||
currentHash,
|
||||
instruction,
|
||||
record.key(),
|
||||
partition)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,11 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
|
|||
case DELETE_KEY_AND_PROPAGATE:
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<VO>(value.getHash(), null))
|
||||
.withValue(new SubscriptionResponseWrapper<VO>(
|
||||
value.getHash(),
|
||||
null,
|
||||
value.getPrimaryPartition()
|
||||
))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
break;
|
||||
|
@ -100,7 +104,7 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
|
|||
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(value.getHash(), valueToSend))
|
||||
.withValue(new SubscriptionResponseWrapper<>(value.getHash(), valueToSend, value.getPrimaryPartition()))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
break;
|
||||
|
@ -108,7 +112,11 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
|
|||
if (foreignValueAndTime != null) {
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()))
|
||||
.withValue(new SubscriptionResponseWrapper<>(
|
||||
value.getHash(),
|
||||
foreignValueAndTime.value(),
|
||||
value.getPrimaryPartition()
|
||||
))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -21,22 +21,30 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|||
import java.util.Arrays;
|
||||
|
||||
public class SubscriptionResponseWrapper<FV> {
|
||||
final static byte CURRENT_VERSION = 0x00;
|
||||
final static byte CURRENT_VERSION = 0;
|
||||
// v0 fields:
|
||||
private final long[] originalValueHash;
|
||||
private final FV foreignValue;
|
||||
private final byte version;
|
||||
// non-serializing fields
|
||||
private final Integer primaryPartition;
|
||||
|
||||
public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue) {
|
||||
this(originalValueHash, foreignValue, CURRENT_VERSION);
|
||||
public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final Integer primaryPartition) {
|
||||
this(originalValueHash, foreignValue, CURRENT_VERSION, primaryPartition);
|
||||
}
|
||||
|
||||
public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final byte version) {
|
||||
if (version != CURRENT_VERSION) {
|
||||
public SubscriptionResponseWrapper(
|
||||
final long[] originalValueHash,
|
||||
final FV foreignValue,
|
||||
final byte version,
|
||||
final Integer primaryPartition) {
|
||||
if (version < 0 || version > CURRENT_VERSION) {
|
||||
throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
|
||||
}
|
||||
this.originalValueHash = originalValueHash;
|
||||
this.foreignValue = foreignValue;
|
||||
this.version = version;
|
||||
this.primaryPartition = primaryPartition;
|
||||
}
|
||||
|
||||
public long[] getOriginalValueHash() {
|
||||
|
@ -51,12 +59,17 @@ public class SubscriptionResponseWrapper<FV> {
|
|||
return version;
|
||||
}
|
||||
|
||||
public Integer getPrimaryPartition() {
|
||||
return primaryPartition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SubscriptionResponseWrapper{" +
|
||||
"version=" + version +
|
||||
", foreignValue=" + foreignValue +
|
||||
", originalValueHash=" + Arrays.toString(originalValueHash) +
|
||||
", primaryPartition=" + primaryPartition +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,7 +91,6 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
|
|||
buf.put(serializedData);
|
||||
return buf.array();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class SubscriptionResponseWrapperDeserializer<V>
|
||||
|
@ -141,9 +140,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
|
|||
value = null;
|
||||
}
|
||||
|
||||
return new SubscriptionResponseWrapper<>(hash, value, version);
|
||||
return new SubscriptionResponseWrapper<>(hash, value, version, null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,12 +23,15 @@ import java.util.Objects;
|
|||
|
||||
|
||||
public class SubscriptionWrapper<K> {
|
||||
static final byte CURRENT_VERSION = 0;
|
||||
static final byte CURRENT_VERSION = 1;
|
||||
|
||||
// v0 fields:
|
||||
private final long[] hash;
|
||||
private final Instruction instruction;
|
||||
private final byte version;
|
||||
private final K primaryKey;
|
||||
// v1 fields:
|
||||
private final Integer primaryPartition;
|
||||
|
||||
public enum Instruction {
|
||||
//Send nothing. Do not propagate.
|
||||
|
@ -65,14 +68,14 @@ public class SubscriptionWrapper<K> {
|
|||
}
|
||||
}
|
||||
|
||||
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey) {
|
||||
this(hash, instruction, primaryKey, CURRENT_VERSION);
|
||||
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final Integer primaryPartition) {
|
||||
this(hash, instruction, primaryKey, CURRENT_VERSION, primaryPartition);
|
||||
}
|
||||
|
||||
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version) {
|
||||
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version, final Integer primaryPartition) {
|
||||
Objects.requireNonNull(instruction, "instruction cannot be null. Required by downstream processor.");
|
||||
Objects.requireNonNull(primaryKey, "primaryKey cannot be null. Required by downstream processor.");
|
||||
if (version != CURRENT_VERSION) {
|
||||
if (version < 0 || version > CURRENT_VERSION) {
|
||||
throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
|
||||
}
|
||||
|
||||
|
@ -80,6 +83,7 @@ public class SubscriptionWrapper<K> {
|
|||
this.hash = hash;
|
||||
this.primaryKey = primaryKey;
|
||||
this.version = version;
|
||||
this.primaryPartition = primaryPartition;
|
||||
}
|
||||
|
||||
public Instruction getInstruction() {
|
||||
|
@ -98,6 +102,10 @@ public class SubscriptionWrapper<K> {
|
|||
return version;
|
||||
}
|
||||
|
||||
public Integer getPrimaryPartition() {
|
||||
return primaryPartition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SubscriptionWrapper{" +
|
||||
|
@ -105,6 +113,7 @@ public class SubscriptionWrapper<K> {
|
|||
", primaryKey=" + primaryKey +
|
||||
", instruction=" + instruction +
|
||||
", hash=" + Arrays.toString(hash) +
|
||||
", primaryPartition=" + primaryPartition +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,10 +16,12 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
|
||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
|
||||
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
|
||||
|
@ -45,6 +47,7 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
|
||||
private String primaryKeySerializationPseudoTopic = null;
|
||||
private Serializer<K> primaryKeySerializer;
|
||||
private boolean upgradeFromV0 = false;
|
||||
|
||||
SubscriptionWrapperSerializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
|
||||
final Serializer<K> primaryKeySerializer) {
|
||||
|
@ -60,34 +63,85 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
this.upgradeFromV0 = upgradeFromV0(configs);
|
||||
}
|
||||
|
||||
private static boolean upgradeFromV0(final Map<String, ?> configs) {
|
||||
final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
|
||||
if (upgradeFrom == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch ((String) upgradeFrom) {
|
||||
case StreamsConfig.UPGRADE_FROM_0100:
|
||||
case StreamsConfig.UPGRADE_FROM_0101:
|
||||
case StreamsConfig.UPGRADE_FROM_0102:
|
||||
case StreamsConfig.UPGRADE_FROM_0110:
|
||||
case StreamsConfig.UPGRADE_FROM_10:
|
||||
case StreamsConfig.UPGRADE_FROM_11:
|
||||
case StreamsConfig.UPGRADE_FROM_20:
|
||||
case StreamsConfig.UPGRADE_FROM_21:
|
||||
case StreamsConfig.UPGRADE_FROM_22:
|
||||
case StreamsConfig.UPGRADE_FROM_23:
|
||||
case StreamsConfig.UPGRADE_FROM_24:
|
||||
case StreamsConfig.UPGRADE_FROM_25:
|
||||
case StreamsConfig.UPGRADE_FROM_26:
|
||||
case StreamsConfig.UPGRADE_FROM_27:
|
||||
case StreamsConfig.UPGRADE_FROM_28:
|
||||
case StreamsConfig.UPGRADE_FROM_30:
|
||||
case StreamsConfig.UPGRADE_FROM_31:
|
||||
case StreamsConfig.UPGRADE_FROM_32:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(final String ignored, final SubscriptionWrapper<K> data) {
|
||||
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
|
||||
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
|
||||
|
||||
//7-bit (0x7F) maximum for data version.
|
||||
if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
|
||||
throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
|
||||
}
|
||||
|
||||
final int version = data.getVersion();
|
||||
if (upgradeFromV0 || version == 0) {
|
||||
return serializeV0(data);
|
||||
} else if (version == 1) {
|
||||
return serializeV1(data);
|
||||
} else {
|
||||
throw new UnsupportedVersionException("Unsupported SubscriptionWrapper version " + data.getVersion());
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] serializePrimaryKey(final SubscriptionWrapper<K> data) {
|
||||
if (primaryKeySerializationPseudoTopic == null) {
|
||||
primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
|
||||
}
|
||||
|
||||
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
|
||||
return primaryKeySerializer.serialize(
|
||||
primaryKeySerializationPseudoTopic,
|
||||
data.getPrimaryKey()
|
||||
);
|
||||
}
|
||||
|
||||
private ByteBuffer serializeCommon(final SubscriptionWrapper<K> data, final byte version, final int extraLength) {
|
||||
final byte[] primaryKeySerializedData = serializePrimaryKey(data);
|
||||
final ByteBuffer buf;
|
||||
int dataLength = 2 + primaryKeySerializedData.length + extraLength;
|
||||
if (data.getHash() != null) {
|
||||
buf = ByteBuffer.allocate(2 + 2 * Long.BYTES + primaryKeySerializedData.length);
|
||||
buf.put(data.getVersion());
|
||||
dataLength += 2 * Long.BYTES;
|
||||
buf = ByteBuffer.allocate(dataLength);
|
||||
buf.put(version);
|
||||
} else {
|
||||
//Don't store hash as it's null.
|
||||
buf = ByteBuffer.allocate(2 + primaryKeySerializedData.length);
|
||||
buf.put((byte) (data.getVersion() | (byte) 0x80));
|
||||
buf = ByteBuffer.allocate(dataLength);
|
||||
buf.put((byte) (version | (byte) 0x80));
|
||||
}
|
||||
|
||||
buf.put(data.getInstruction().getValue());
|
||||
final long[] elem = data.getHash();
|
||||
if (data.getHash() != null) {
|
||||
|
@ -95,9 +149,18 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
buf.putLong(elem[1]);
|
||||
}
|
||||
buf.put(primaryKeySerializedData);
|
||||
return buf.array();
|
||||
return buf;
|
||||
}
|
||||
|
||||
private byte[] serializeV0(final SubscriptionWrapper<K> data) {
|
||||
return serializeCommon(data, (byte) 0, 0).array();
|
||||
}
|
||||
|
||||
private byte[] serializeV1(final SubscriptionWrapper<K> data) {
|
||||
final ByteBuffer buf = serializeCommon(data, data.getVersion(), Integer.BYTES);
|
||||
buf.putInt(data.getPrimaryPartition());
|
||||
return buf.array();
|
||||
}
|
||||
}
|
||||
|
||||
private static class SubscriptionWrapperDeserializer<K>
|
||||
|
@ -123,15 +186,15 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
|
||||
@Override
|
||||
public SubscriptionWrapper<K> deserialize(final String ignored, final byte[] data) {
|
||||
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
|
||||
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
|
||||
final ByteBuffer buf = ByteBuffer.wrap(data);
|
||||
final byte versionAndIsHashNull = buf.get();
|
||||
final byte version = (byte) (0x7F & versionAndIsHashNull);
|
||||
final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
|
||||
final SubscriptionWrapper.Instruction inst = SubscriptionWrapper.Instruction.fromValue(buf.get());
|
||||
|
||||
final long[] hash;
|
||||
int lengthSum = 2; //The first 2 bytes
|
||||
final long[] hash;
|
||||
if (isHashNull) {
|
||||
hash = null;
|
||||
} else {
|
||||
|
@ -141,17 +204,31 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
lengthSum += 2 * Long.BYTES;
|
||||
}
|
||||
|
||||
final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
|
||||
buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
|
||||
final int primaryKeyLength;
|
||||
if (version > 0) {
|
||||
primaryKeyLength = data.length - lengthSum - Integer.BYTES;
|
||||
} else {
|
||||
primaryKeyLength = data.length - lengthSum;
|
||||
}
|
||||
final byte[] primaryKeyRaw = new byte[primaryKeyLength];
|
||||
buf.get(primaryKeyRaw, 0, primaryKeyLength);
|
||||
|
||||
if (primaryKeySerializationPseudoTopic == null) {
|
||||
primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
|
||||
}
|
||||
|
||||
final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
|
||||
primaryKeyRaw);
|
||||
final K primaryKey = primaryKeyDeserializer.deserialize(
|
||||
primaryKeySerializationPseudoTopic,
|
||||
primaryKeyRaw
|
||||
);
|
||||
final Integer primaryPartition;
|
||||
if (version > 0) {
|
||||
primaryPartition = buf.getInt();
|
||||
} else {
|
||||
primaryPartition = null;
|
||||
}
|
||||
|
||||
return new SubscriptionWrapper<>(hash, inst, primaryKey, version);
|
||||
return new SubscriptionWrapper<>(hash, inst, primaryKey, version, primaryPartition);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -114,6 +114,17 @@ public final class AssignorConfiguration {
|
|||
log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
|
||||
" Please be prepared to remove the 'upgrade.from' config soon.");
|
||||
return RebalanceProtocol.EAGER;
|
||||
case StreamsConfig.UPGRADE_FROM_24:
|
||||
case StreamsConfig.UPGRADE_FROM_25:
|
||||
case StreamsConfig.UPGRADE_FROM_26:
|
||||
case StreamsConfig.UPGRADE_FROM_27:
|
||||
case StreamsConfig.UPGRADE_FROM_28:
|
||||
case StreamsConfig.UPGRADE_FROM_30:
|
||||
case StreamsConfig.UPGRADE_FROM_31:
|
||||
case StreamsConfig.UPGRADE_FROM_32:
|
||||
// This config is for explicitly sending FK response to a requested partition
|
||||
// and should not affect the rebalance protocol
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
|
||||
}
|
||||
|
@ -157,6 +168,17 @@ public final class AssignorConfiguration {
|
|||
case StreamsConfig.UPGRADE_FROM_23:
|
||||
// These configs are for cooperative rebalancing and should not affect the metadata version
|
||||
break;
|
||||
case StreamsConfig.UPGRADE_FROM_24:
|
||||
case StreamsConfig.UPGRADE_FROM_25:
|
||||
case StreamsConfig.UPGRADE_FROM_26:
|
||||
case StreamsConfig.UPGRADE_FROM_27:
|
||||
case StreamsConfig.UPGRADE_FROM_28:
|
||||
case StreamsConfig.UPGRADE_FROM_30:
|
||||
case StreamsConfig.UPGRADE_FROM_31:
|
||||
case StreamsConfig.UPGRADE_FROM_32:
|
||||
// This config is for explicitly sending FK response to a requested partition
|
||||
// and should not affect the metadata version
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
|
||||
|
|
|
@ -89,7 +89,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", "lhsValue");
|
||||
final long[] oldHash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue"), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue", 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded, empty());
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", null);
|
||||
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded, empty());
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", "lhsValue");
|
||||
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded.size(), is(1));
|
||||
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,rhsValue)", 0)));
|
||||
|
@ -165,7 +165,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", "lhsValue");
|
||||
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded.size(), is(1));
|
||||
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
|
||||
|
@ -191,7 +191,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", "lhsValue");
|
||||
final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded.size(), is(1));
|
||||
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,null)", 0)));
|
||||
|
@ -217,7 +217,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
|
|||
|
||||
valueGetterSupplier.put("lhs1", null);
|
||||
final long[] hash = null;
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
|
||||
processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
|
||||
final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
|
||||
assertThat(forwarded.size(), is(1));
|
||||
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
|
||||
|
|
|
@ -76,26 +76,28 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
public void ShouldSerdeWithNonNullsTest() {
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
|
||||
final String foreignValue = "foreignValue";
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
|
||||
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
|
||||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeWithNullForeignValueTest() {
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null);
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1);
|
||||
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
|
||||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertNull(result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -103,13 +105,14 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
public void shouldSerdeWithNullHashTest() {
|
||||
final long[] hashedValue = null;
|
||||
final String foreignValue = "foreignValue";
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
|
||||
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
|
||||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -117,19 +120,20 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
public void shouldSerdeWithNullsTest() {
|
||||
final long[] hashedValue = null;
|
||||
final String foreignValue = null;
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
|
||||
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
|
||||
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
|
||||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionWithBadVersionTest() {
|
||||
final long[] hashedValue = null;
|
||||
assertThrows(UnsupportedVersionException.class,
|
||||
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF));
|
||||
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,13 +16,16 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
|
||||
|
||||
import java.util.Collections;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.state.internals.Murmur3;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
|
@ -30,53 +33,229 @@ public class SubscriptionWrapperSerdeTest {
|
|||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeTest() {
|
||||
public void shouldSerdeV0Test() {
|
||||
final byte version = 0;
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
|
||||
final Integer primaryPartition = null;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeNullHashTest() {
|
||||
public void shouldSerdeV1Test() {
|
||||
final byte version = 1;
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = 10;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeWithV0IfUpgradeTest() {
|
||||
final byte version = 1;
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
swSerde.configure(
|
||||
Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_32),
|
||||
true);
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = 10;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(0, deserialized.getVersion());
|
||||
assertNull(deserialized.getPrimaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeNullHashV0Test() {
|
||||
final byte version = 0;
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final long[] hashedValue = null;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
|
||||
final Integer primaryPartition = null;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullKeyTest() {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void shouldSerdeNullHashV1Test() {
|
||||
final byte version = 1;
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final long[] hashedValue = null;
|
||||
final Integer primaryPartition = 10;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSerdeNullPrimaryPartitionOnV0Test() {
|
||||
final String originalKey = "originalKey";
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = null;
|
||||
final byte version = 0;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
version,
|
||||
primaryPartition);
|
||||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullKeyV0Test() {
|
||||
final String originalKey = null;
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = 10;
|
||||
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey));
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
(byte) 0,
|
||||
primaryPartition));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullInstructionTest() {
|
||||
public void shouldThrowExceptionOnNullKeyV1Test() {
|
||||
final String originalKey = null;
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = 10;
|
||||
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
(byte) 1,
|
||||
primaryPartition));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullInstructionV0Test() {
|
||||
final String originalKey = "originalKey";
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue, null, originalKey));
|
||||
final Integer primaryPartition = 10;
|
||||
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
null,
|
||||
originalKey,
|
||||
(byte) 0,
|
||||
primaryPartition));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullInstructionV1Test() {
|
||||
final String originalKey = "originalKey";
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = 10;
|
||||
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
null,
|
||||
originalKey,
|
||||
(byte) 0,
|
||||
primaryPartition));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnNullPrimaryPartitionV1Test() {
|
||||
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
|
||||
final String originalKey = "originalKey";
|
||||
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
|
||||
final Integer primaryPartition = null;
|
||||
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
(byte) 1,
|
||||
primaryPartition);
|
||||
assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper));
|
||||
}
|
||||
|
||||
@Test (expected = UnsupportedVersionException.class)
|
||||
public void shouldThrowExceptionOnUnsupportedVersionTest() {
|
||||
final String originalKey = "originalKey";
|
||||
final long[] hashedValue = null;
|
||||
new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, (byte) 0x80);
|
||||
final Integer primaryPartition = 10;
|
||||
new SubscriptionWrapper<>(
|
||||
hashedValue,
|
||||
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
|
||||
originalKey,
|
||||
(byte) 0x80,
|
||||
primaryPartition);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue