mirror of https://github.com/apache/kafka.git
MINOR: remove get prefix for internal DSL methods (#17050)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
59d3d7021a
commit
1f5aea2a86
|
@ -364,10 +364,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
|
|||
private void rewriteRepartitionNodes() {
|
||||
final Set<BaseRepartitionNode<?, ?>> nodes = new NodesWithRelaxedNullKeyJoinDownstream(root).find();
|
||||
for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
|
||||
if (partitionNode.getProcessorParameters() != null) {
|
||||
if (partitionNode.processorParameters() != null) {
|
||||
partitionNode.setProcessorParameters(new ProcessorParameters<>(
|
||||
new KStreamFilter<>((k, v) -> k != null, false),
|
||||
partitionNode.getProcessorParameters().processorName()
|
||||
partitionNode.processorParameters().processorName()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
@ -445,9 +445,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
|
|||
GraphNode left = null, right = null;
|
||||
for (final GraphNode child: parent.children()) {
|
||||
if (child instanceof WindowedStreamProcessorNode && child.buildPriority() < joinNode.buildPriority()) {
|
||||
if (child.nodeName().equals(joinNode.getThisWindowedStreamProcessorParameters().processorName())) {
|
||||
if (child.nodeName().equals(joinNode.thisWindowedStreamProcessorParameters().processorName())) {
|
||||
left = child;
|
||||
} else if (child.nodeName().equals(joinNode.getOtherWindowedStreamProcessorParameters().processorName())) {
|
||||
} else if (child.nodeName().equals(joinNode.otherWindowedStreamProcessorParameters().processorName())) {
|
||||
right = child;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,9 +170,9 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> impleme
|
|||
|
||||
protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp);
|
||||
|
||||
protected abstract VThis getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
|
||||
protected abstract VThis thisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
|
||||
|
||||
protected abstract VOther getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
|
||||
protected abstract VOther otherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
|
||||
|
||||
private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store,
|
||||
final Record<K, VThis> record) {
|
||||
|
@ -252,8 +252,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> impleme
|
|||
final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
|
||||
final K key = timestampedKeyAndJoinSide.key();
|
||||
final long timestamp = timestampedKeyAndJoinSide.timestamp();
|
||||
final VThis thisValue = getThisValue(leftOrRightValue);
|
||||
final VOther otherValue = getOtherValue(leftOrRightValue);
|
||||
final VThis thisValue = thisValue(leftOrRightValue);
|
||||
final VOther otherValue = otherValue(leftOrRightValue);
|
||||
final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue);
|
||||
context().forward(
|
||||
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
|
||||
|
|
|
@ -58,13 +58,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> extends KStreamKStreamJ
|
|||
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
public VLeft thisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
return leftOrRightValue.leftValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
public VRight otherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
return leftOrRightValue.rightValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,13 +57,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> extends KStreamKStream
|
|||
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
public VRight thisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
return leftOrRightValue.rightValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
public VLeft otherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
|
||||
return leftOrRightValue.leftValue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -573,7 +573,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder;
|
||||
|
||||
if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
|
||||
final Map<String, String> topicConfig = suppressedInternal.bufferConfig().getLogConfig();
|
||||
final Map<String, String> topicConfig = suppressedInternal.bufferConfig().logConfig();
|
||||
storeBuilder = new InMemoryTimeOrderedKeyValueChangeBuffer.Builder<>(
|
||||
storeName,
|
||||
keySerde,
|
||||
|
@ -1165,7 +1165,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
|
||||
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> defaultForeignResponseSinkPartitioner =
|
||||
(topic, key, subscriptionResponseWrapper, numPartitions) -> {
|
||||
final Integer partition = subscriptionResponseWrapper.getPrimaryPartition();
|
||||
final Integer partition = subscriptionResponseWrapper.primaryPartition();
|
||||
return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
|
||||
};
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
|
|||
this.queryableName = queryableName;
|
||||
}
|
||||
|
||||
public String getQueryableName() {
|
||||
public String queryableName() {
|
||||
return queryableName;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,11 +29,11 @@ public class CombinedKey<KF, KP> {
|
|||
this.primaryKey = primaryKey;
|
||||
}
|
||||
|
||||
public KF getForeignKey() {
|
||||
public KF foreignKey() {
|
||||
return foreignKey;
|
||||
}
|
||||
|
||||
public KP getPrimaryKey() {
|
||||
public KP primaryKey() {
|
||||
return primaryKey;
|
||||
}
|
||||
|
||||
|
|
|
@ -123,11 +123,11 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
|
|||
if (prefixEquals(next.key.get(), prefixBytes.get())) {
|
||||
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
|
||||
context().forward(
|
||||
record.withKey(combinedKey.getPrimaryKey())
|
||||
record.withKey(combinedKey.primaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(
|
||||
next.value.value().getHash(),
|
||||
next.value.value().hash(),
|
||||
record.value().newValue,
|
||||
next.value.value().getPrimaryPartition()))
|
||||
next.value.value().primaryPartition()))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSup
|
|||
|
||||
@Override
|
||||
public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
|
||||
if (record.value().getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) {
|
||||
if (record.value().version() != SubscriptionResponseWrapper.CURRENT_VERSION) {
|
||||
//Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
|
||||
//compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
|
||||
//upgrading from older SubscriptionWrapper versions to newer versions.
|
||||
|
@ -111,16 +111,16 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSup
|
|||
null :
|
||||
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
|
||||
|
||||
final long[] messageHash = record.value().getOriginalValueHash();
|
||||
final long[] messageHash = record.value().originalValueHash();
|
||||
|
||||
//If this value doesn't match the current value from the original table, it is stale and should be discarded.
|
||||
if (java.util.Arrays.equals(messageHash, currentHash)) {
|
||||
final VR result;
|
||||
|
||||
if (record.value().getForeignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
|
||||
if (record.value().foreignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
|
||||
result = null; //Emit tombstone
|
||||
} else {
|
||||
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), record.value().getForeignValue());
|
||||
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), record.value().foreignValue());
|
||||
}
|
||||
context().forward(record.withValue(result));
|
||||
} else {
|
||||
|
|
|
@ -70,7 +70,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
|
|||
Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
|
||||
final SubscriptionWrapper<K> value = valueAndTimestamp.value();
|
||||
|
||||
if (value.getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
|
||||
if (value.version() > SubscriptionWrapper.CURRENT_VERSION) {
|
||||
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
|
||||
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
|
||||
//from older SubscriptionWrapper versions to newer versions.
|
||||
|
@ -78,23 +78,23 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
|
|||
}
|
||||
|
||||
final ValueAndTimestamp<VO> foreignValueAndTime =
|
||||
record.key().getForeignKey() == null ?
|
||||
record.key().foreignKey() == null ?
|
||||
null :
|
||||
foreignValues.get(record.key().getForeignKey());
|
||||
foreignValues.get(record.key().foreignKey());
|
||||
|
||||
final long resultTimestamp =
|
||||
foreignValueAndTime == null ?
|
||||
valueAndTimestamp.timestamp() :
|
||||
Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
|
||||
|
||||
switch (value.getInstruction()) {
|
||||
switch (value.instruction()) {
|
||||
case DELETE_KEY_AND_PROPAGATE:
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
record.withKey(record.key().primaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<VO>(
|
||||
value.getHash(),
|
||||
value.hash(),
|
||||
null,
|
||||
value.getPrimaryPartition()
|
||||
value.primaryPartition()
|
||||
))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
|
@ -106,11 +106,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
|
|||
final VO valueToSend = foreignValueAndTime == null ? null : foreignValueAndTime.value();
|
||||
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
record.withKey(record.key().primaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(
|
||||
value.getHash(),
|
||||
value.hash(),
|
||||
valueToSend,
|
||||
value.getPrimaryPartition()
|
||||
value.primaryPartition()
|
||||
))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
|
@ -118,11 +118,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
|
|||
case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
|
||||
if (foreignValueAndTime != null) {
|
||||
context().forward(
|
||||
record.withKey(record.key().getPrimaryKey())
|
||||
record.withKey(record.key().primaryKey())
|
||||
.withValue(new SubscriptionResponseWrapper<>(
|
||||
value.getHash(),
|
||||
value.hash(),
|
||||
foreignValueAndTime.value(),
|
||||
value.getPrimaryPartition()
|
||||
value.primaryPartition()
|
||||
))
|
||||
.withTimestamp(resultTimestamp)
|
||||
);
|
||||
|
@ -131,7 +131,7 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
|
|||
case DELETE_KEY_NO_PROPAGATE:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unhandled instruction: " + value.getInstruction());
|
||||
throw new IllegalStateException("Unhandled instruction: " + value.instruction());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -75,18 +75,18 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
|
|||
|
||||
@Override
|
||||
public void process(final Record<KO, SubscriptionWrapper<K>> record) {
|
||||
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction())) {
|
||||
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
|
||||
dropRecord();
|
||||
return;
|
||||
}
|
||||
if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) {
|
||||
if (record.value().version() > SubscriptionWrapper.CURRENT_VERSION) {
|
||||
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
|
||||
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
|
||||
//from older SubscriptionWrapper versions to newer versions.
|
||||
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
|
||||
}
|
||||
context().forward(
|
||||
record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey()))
|
||||
record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey()))
|
||||
.withValue(inferChange(record))
|
||||
.withTimestamp(record.timestamp())
|
||||
);
|
||||
|
@ -101,14 +101,14 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
|
|||
}
|
||||
|
||||
private Change<ValueAndTimestamp<SubscriptionWrapper<K>>> inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
|
||||
final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().getPrimaryKey());
|
||||
final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().primaryKey());
|
||||
|
||||
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(record.value(), record.timestamp());
|
||||
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
|
||||
|
||||
//This store is used by the prefix scanner in ForeignTableJoinProcessorSupplier
|
||||
if (record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
|
||||
record.value().getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
|
||||
if (record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) ||
|
||||
record.value().instruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
|
||||
store.delete(subscriptionKey);
|
||||
} else {
|
||||
store.put(subscriptionKey, newValue);
|
||||
|
|
|
@ -48,19 +48,19 @@ public class SubscriptionResponseWrapper<FV> {
|
|||
this.primaryPartition = primaryPartition;
|
||||
}
|
||||
|
||||
public long[] getOriginalValueHash() {
|
||||
public long[] originalValueHash() {
|
||||
return originalValueHash;
|
||||
}
|
||||
|
||||
public FV getForeignValue() {
|
||||
public FV foreignValue() {
|
||||
return foreignValue;
|
||||
}
|
||||
|
||||
public byte getVersion() {
|
||||
public byte version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public Integer getPrimaryPartition() {
|
||||
public Integer primaryPartition() {
|
||||
return primaryPartition;
|
||||
}
|
||||
|
||||
|
|
|
@ -67,24 +67,24 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
|
|||
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
|
||||
|
||||
//7-bit (0x7F) maximum for data version.
|
||||
if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
|
||||
if (Byte.compare((byte) 0x7F, data.version()) < 0) {
|
||||
throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
|
||||
}
|
||||
|
||||
final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue());
|
||||
final byte[] serializedData = data.foreignValue() == null ? null : serializer.serialize(topic, data.foreignValue());
|
||||
final int serializedDataLength = serializedData == null ? 0 : serializedData.length;
|
||||
final long[] originalHash = data.getOriginalValueHash();
|
||||
final long[] originalHash = data.originalValueHash();
|
||||
final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
|
||||
|
||||
final ByteBuffer buf = ByteBuffer.allocate(1 + hashLength + serializedDataLength);
|
||||
|
||||
if (originalHash != null) {
|
||||
buf.put(data.getVersion());
|
||||
buf.put(data.version());
|
||||
buf.putLong(originalHash[0]);
|
||||
buf.putLong(originalHash[1]);
|
||||
} else {
|
||||
//Don't store hash as it's null.
|
||||
buf.put((byte) (data.getVersion() | (byte) 0x80));
|
||||
buf.put((byte) (data.version() | (byte) 0x80));
|
||||
}
|
||||
|
||||
if (serializedData != null)
|
||||
|
|
|
@ -57,7 +57,7 @@ public class SubscriptionWrapper<K> {
|
|||
this.value = value;
|
||||
}
|
||||
|
||||
public byte getValue() {
|
||||
public byte value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
|
@ -89,23 +89,23 @@ public class SubscriptionWrapper<K> {
|
|||
this.primaryPartition = primaryPartition;
|
||||
}
|
||||
|
||||
public Instruction getInstruction() {
|
||||
public Instruction instruction() {
|
||||
return instruction;
|
||||
}
|
||||
|
||||
public long[] getHash() {
|
||||
public long[] hash() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public K getPrimaryKey() {
|
||||
public K primaryKey() {
|
||||
return primaryKey;
|
||||
}
|
||||
|
||||
public byte getVersion() {
|
||||
public byte version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public Integer getPrimaryPartition() {
|
||||
public Integer primaryPartition() {
|
||||
return primaryPartition;
|
||||
}
|
||||
|
||||
|
|
|
@ -107,17 +107,17 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
|
||||
|
||||
//7-bit (0x7F) maximum for data version.
|
||||
if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
|
||||
if (Byte.compare((byte) 0x7F, data.version()) < 0) {
|
||||
throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
|
||||
}
|
||||
|
||||
final int version = data.getVersion();
|
||||
final int version = data.version();
|
||||
if (upgradeFromV0 || version == 0) {
|
||||
return serializeV0(data);
|
||||
} else if (version == 1) {
|
||||
return serializeV1(data);
|
||||
} else {
|
||||
throw new UnsupportedVersionException("Unsupported SubscriptionWrapper version " + data.getVersion());
|
||||
throw new UnsupportedVersionException("Unsupported SubscriptionWrapper version " + data.version());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
|
||||
return primaryKeySerializer.serialize(
|
||||
primaryKeySerializationPseudoTopic,
|
||||
data.getPrimaryKey()
|
||||
data.primaryKey()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
final byte[] primaryKeySerializedData = serializePrimaryKey(data);
|
||||
final ByteBuffer buf;
|
||||
int dataLength = 2 + primaryKeySerializedData.length + extraLength;
|
||||
if (data.getHash() != null) {
|
||||
if (data.hash() != null) {
|
||||
dataLength += 2 * Long.BYTES;
|
||||
buf = ByteBuffer.allocate(dataLength);
|
||||
buf.put(version);
|
||||
|
@ -145,9 +145,9 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
buf = ByteBuffer.allocate(dataLength);
|
||||
buf.put((byte) (version | (byte) 0x80));
|
||||
}
|
||||
buf.put(data.getInstruction().getValue());
|
||||
final long[] elem = data.getHash();
|
||||
if (data.getHash() != null) {
|
||||
buf.put(data.instruction().value());
|
||||
final long[] elem = data.hash();
|
||||
if (data.hash() != null) {
|
||||
buf.putLong(elem[0]);
|
||||
buf.putLong(elem[1]);
|
||||
}
|
||||
|
@ -160,8 +160,8 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
|
|||
}
|
||||
|
||||
private byte[] serializeV1(final SubscriptionWrapper<K> data) {
|
||||
final ByteBuffer buf = serializeCommon(data, data.getVersion(), Integer.BYTES);
|
||||
buf.putInt(data.getPrimaryPartition());
|
||||
final ByteBuffer buf = serializeCommon(data, data.version(), Integer.BYTES);
|
||||
buf.putInt(data.primaryPartition());
|
||||
return buf.array();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public abstract class BaseRepartitionNode<K, V> extends GraphNode {
|
|||
this.processorParameters = processorParameters;
|
||||
}
|
||||
|
||||
public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
|
||||
public ProcessorParameters<K, V, ?, ?> processorParameters() {
|
||||
return processorParameters;
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
|
|||
}
|
||||
|
||||
public String queryableStoreName() {
|
||||
return joinMerger().getQueryableName();
|
||||
return joinMerger().queryableName();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -116,11 +116,11 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
|
|||
return isSelfJoin;
|
||||
}
|
||||
|
||||
public ProcessorParameters<K, V1, ?, ?> getThisWindowedStreamProcessorParameters() {
|
||||
public ProcessorParameters<K, V1, ?, ?> thisWindowedStreamProcessorParameters() {
|
||||
return thisWindowedStreamProcessorParameters;
|
||||
}
|
||||
|
||||
public ProcessorParameters<K, V2, ?, ?> getOtherWindowedStreamProcessorParameters() {
|
||||
public ProcessorParameters<K, V2, ?, ?> otherWindowedStreamProcessorParameters() {
|
||||
return otherWindowedStreamProcessorParameters;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,21 +36,21 @@ public abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC
|
|||
Long.MAX_VALUE,
|
||||
Long.MAX_VALUE,
|
||||
SHUT_DOWN, // doesn't matter, given the bounds
|
||||
getLogConfig()
|
||||
logConfig()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.StrictBufferConfig shutDownWhenFull() {
|
||||
return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, getLogConfig());
|
||||
return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
|
||||
return new EagerBufferConfigImpl(maxRecords(), maxBytes(), getLogConfig());
|
||||
return new EagerBufferConfigImpl(maxRecords(), maxBytes(), logConfig());
|
||||
}
|
||||
|
||||
public abstract boolean isLoggingEnabled();
|
||||
|
||||
public abstract Map<String, String> getLogConfig();
|
||||
public abstract Map<String, String> logConfig();
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLogConfig() {
|
||||
public Map<String, String> logConfig() {
|
||||
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
|
||||
}
|
||||
|
||||
|
@ -92,19 +92,19 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
|
|||
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
|
||||
return maxRecords == that.maxRecords &&
|
||||
maxBytes == that.maxBytes &&
|
||||
Objects.equals(getLogConfig(), that.getLogConfig());
|
||||
Objects.equals(logConfig(), that.logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(maxRecords, maxBytes, getLogConfig());
|
||||
return Objects.hash(maxRecords, maxBytes, logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
|
||||
", maxBytes=" + maxBytes +
|
||||
", logConfig=" + getLogConfig() +
|
||||
", logConfig=" + logConfig() +
|
||||
"}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,12 +51,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
|
|||
|
||||
@Override
|
||||
public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
|
||||
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, getLogConfig());
|
||||
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
|
||||
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, getLogConfig());
|
||||
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +90,7 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLogConfig() {
|
||||
public Map<String, String> logConfig() {
|
||||
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
|
||||
}
|
||||
|
||||
|
@ -106,12 +106,12 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
|
|||
return maxRecords == that.maxRecords &&
|
||||
maxBytes == that.maxBytes &&
|
||||
bufferFullStrategy == that.bufferFullStrategy &&
|
||||
Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) o).getLogConfig());
|
||||
Objects.equals(logConfig(), ((StrictBufferConfigImpl) o).logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, getLogConfig());
|
||||
return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, logConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,7 +119,7 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
|
|||
return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
|
||||
", maxBytes=" + maxBytes +
|
||||
", bufferFullStrategy=" + bufferFullStrategy +
|
||||
", logConfig=" + getLogConfig().toString() +
|
||||
", logConfig=" + logConfig().toString() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,8 +38,8 @@ public class CombinedKeySchemaTest {
|
|||
final Bytes result = cks.toBytes("foreignKey", primary);
|
||||
|
||||
final CombinedKey<String, Integer> deserializedKey = cks.fromBytes(result);
|
||||
assertEquals("foreignKey", deserializedKey.getForeignKey());
|
||||
assertEquals(primary, deserializedKey.getPrimaryKey());
|
||||
assertEquals("foreignKey", deserializedKey.foreignKey());
|
||||
assertEquals(primary, deserializedKey.primaryKey());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -77,7 +77,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
null,
|
||||
null),
|
||||
1L
|
||||
|
@ -111,7 +111,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
null,
|
||||
12
|
||||
),
|
||||
|
@ -146,7 +146,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
"foo",
|
||||
null
|
||||
),
|
||||
|
@ -181,7 +181,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
"foo",
|
||||
12
|
||||
),
|
||||
|
@ -216,7 +216,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
"foo",
|
||||
null
|
||||
),
|
||||
|
@ -237,7 +237,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
null,
|
||||
null
|
||||
),
|
||||
|
@ -270,7 +270,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
"foo",
|
||||
12
|
||||
),
|
||||
|
@ -291,7 +291,7 @@ public class SubscriptionJoinProcessorSupplierTest {
|
|||
new Record<>(
|
||||
"pk1",
|
||||
new SubscriptionResponseWrapper<>(
|
||||
newValue.getHash(),
|
||||
newValue.hash(),
|
||||
null,
|
||||
12
|
||||
),
|
||||
|
|
|
@ -82,9 +82,9 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
assertArrayEquals(hashedValue, result.originalValueHash());
|
||||
assertEquals(foreignValue, result.foreignValue());
|
||||
assertNull(result.primaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -96,9 +96,9 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertNull(result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
assertArrayEquals(hashedValue, result.originalValueHash());
|
||||
assertNull(result.foreignValue());
|
||||
assertNull(result.primaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -111,9 +111,9 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
assertArrayEquals(hashedValue, result.originalValueHash());
|
||||
assertEquals(foreignValue, result.foreignValue());
|
||||
assertNull(result.primaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -126,9 +126,9 @@ public class SubscriptionResponseWrapperSerdeTest {
|
|||
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
|
||||
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
|
||||
|
||||
assertArrayEquals(hashedValue, result.getOriginalValueHash());
|
||||
assertEquals(foreignValue, result.getForeignValue());
|
||||
assertNull(result.getPrimaryPartition());
|
||||
assertArrayEquals(hashedValue, result.originalValueHash());
|
||||
assertEquals(foreignValue, result.foreignValue());
|
||||
assertNull(result.primaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -51,11 +51,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(primaryPartition, deserialized.primaryPartition());
|
||||
assertEquals(version, deserialized.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -76,11 +76,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(primaryPartition, deserialized.primaryPartition());
|
||||
assertEquals(version, deserialized.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -104,11 +104,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(0, deserialized.getVersion());
|
||||
assertNull(deserialized.getPrimaryPartition());
|
||||
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(0, deserialized.version());
|
||||
assertNull(deserialized.primaryPartition());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -128,11 +128,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(primaryPartition, deserialized.primaryPartition());
|
||||
assertEquals(version, deserialized.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -153,11 +153,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
|
||||
.deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(primaryPartition, deserialized.primaryPartition());
|
||||
assertEquals(version, deserialized.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -176,11 +176,11 @@ public class SubscriptionWrapperSerdeTest {
|
|||
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
|
||||
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
|
||||
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
|
||||
assertArrayEquals(hashedValue, deserialized.getHash());
|
||||
assertEquals(originalKey, deserialized.getPrimaryKey());
|
||||
assertEquals(primaryPartition, deserialized.getPrimaryPartition());
|
||||
assertEquals(version, deserialized.getVersion());
|
||||
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.instruction());
|
||||
assertArrayEquals(hashedValue, deserialized.hash());
|
||||
assertEquals(originalKey, deserialized.primaryKey());
|
||||
assertEquals(primaryPartition, deserialized.primaryPartition());
|
||||
assertEquals(version, deserialized.version());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue