MINOR: Cleanup Joined class (#14551)

Code cleanup and JavaDocs fixed, plus add missing getters to JoinedInternal.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-08-21 14:31:22 -07:00 committed by GitHub
parent e4cc5d18f4
commit 1e14b0c964
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 154 additions and 117 deletions

View File

@ -24,47 +24,56 @@ import java.time.Duration;
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and
* {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} operations.
*
* @param <K> type of record key
* @param <VLeft> type of left record value
* @param <VRight> type of right record value
*/
public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
public class Joined<K, VLeft, VRight> implements NamedOperation<Joined<K, VLeft, VRight>> {
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
protected final Serde<VO> otherValueSerde;
protected final Serde<VLeft> leftValueSerde;
protected final Serde<VRight> rightValueSerde;
protected final String name;
protected final Duration gracePeriod;
private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final Serde<VLeft> leftValueSerde,
final Serde<VRight> rightValueSerde,
final String name,
final Duration gracePeriod) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.otherValueSerde = otherValueSerde;
this.leftValueSerde = leftValueSerde;
this.rightValueSerde = rightValueSerde;
this.name = name;
this.gracePeriod = gracePeriod;
}
protected Joined(final Joined<K, V, VO> joined) {
this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name, joined.gracePeriod);
protected Joined(final Joined<K, VLeft, VRight> joined) {
this(joined.keySerde, joined.leftValueSerde, joined.rightValueSerde, joined.name, joined.gracePeriod);
}
/**
* Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
* {@code null} values are accepted and will be replaced by the default serdes as defined in config.
*
* @param keySerde the key serde to use. If {@code null} the default key serde from config will be used
* @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used
* @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param keySerde
* the key serde to use. If {@code null} the default key serde from config will be used
* @param leftValueSerde
* the value serde to use. If {@code null} the default value serde from config will be used
* @param rightValueSerde
* the otherValue serde to use. If {@code null} the default value serde from config will be used
*
* @param <K> key type
* @param <VLeft> left value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance with the provided serdes
*/
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null);
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final Serde<K> keySerde,
final Serde<VLeft> leftValueSerde,
final Serde<VRight> rightValueSerde) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, null, null);
}
/**
@ -72,24 +81,26 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
* {@code null} values are accepted and will be replaced by the default serdes as defined in
* config.
*
* @param keySerde the key serde to use. If {@code null} the default key serde from config will be
* used
* @param valueSerde the value serde to use. If {@code null} the default value serde from config
* will be used
* @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde
* from config will be used
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param keySerde
* the key serde to use. If {@code null} the default key serde from config will be used
* @param leftValueSerde
* the left value serde to use. If {@code null} the default value serde from config will be used
* @param rightValueSerde
* the right value serde to use. If {@code null} the default value serde from config will be used
* @param name
* the name used as the base for naming components of the join including any repartition topics
*
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param <VLeft> left value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance with the provided serdes
*/
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null);
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final Serde<K> keySerde,
final Serde<VLeft> leftValueSerde,
final Serde<VRight> rightValueSerde,
final String name) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, null);
}
/**
@ -97,39 +108,45 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
* {@code null} values are accepted and will be replaced by the default serdes as defined in
* config.
*
* @param keySerde the key serde to use. If {@code null} the default key serde from config will be
* used
* @param valueSerde the value serde to use. If {@code null} the default value serde from config
* will be used
* @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde
* from config will be used
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param gracePeriod stream buffer time
* @param keySerde
* the key serde to use. If {@code null} the default key serde from config will be used
* @param leftValueSerde
* the left value serde to use. If {@code null} the default value serde from config will be used
* @param rightValueSerde
* the right value serde to use. If {@code null} the default value serde from config will be used
* @param name
* the name used as the base for naming components of the join including any repartition topics
* @param gracePeriod
* stream buffer time
*
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param <VLeft> value value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance with the provided serdes
*/
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final String name,
final Duration gracePeriod) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final Serde<K> keySerde,
final Serde<VLeft> leftValueSerde,
final Serde<VRight> rightValueSerde,
final String name,
final Duration gracePeriod) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
/**
* Create an instance of {@code Joined} with a key {@link Serde}.
* {@code null} values are accepted and will be replaced by the default key serde as defined in config.
*
* @param keySerde the key serde to use. If {@code null} the default key serde from config will be used
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param keySerde
* the key serde to use. If {@code null} the default key serde from config will be used
*
* @param <K> key type
* @param <VLeft> value value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance configured with the keySerde
*/
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) {
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> keySerde(final Serde<K> keySerde) {
return new Joined<>(keySerde, null, null, null, null);
}
@ -137,44 +154,51 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
* Create an instance of {@code Joined} with a value {@link Serde}.
* {@code null} values are accepted and will be replaced by the default value serde as defined in config.
*
* @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param leftValueSerde
* the left value serde to use. If {@code null} the default value serde from config will be used
*
* @param <K> key type
* @param <VLeft> left value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance configured with the valueSerde
*/
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) {
return new Joined<>(null, valueSerde, null, null, null);
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> valueSerde(final Serde<VLeft> leftValueSerde) {
return new Joined<>(null, leftValueSerde, null, null, null);
}
/**
* Create an instance of {@code Joined} with an other value {@link Serde}.
* Create an instance of {@code Joined} with aother value {@link Serde}.
* {@code null} values are accepted and will be replaced by the default value serde as defined in config.
*
* @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @param rightValueSerde
* the right value serde to use. If {@code null} the default value serde from config will be used
*
* @param <K> key type
* @param <VLeft> value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance configured with the otherValueSerde
*/
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherValueSerde) {
return new Joined<>(null, null, otherValueSerde, null, null);
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> otherValueSerde(final Serde<VRight> rightValueSerde) {
return new Joined<>(null, null, rightValueSerde, null, null);
}
/**
* Create an instance of {@code Joined} with base name for all components of the join, this may
* include any repartition topics created to complete the join.
*
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @return new {@code Joined} instance configured with the name
* @param name
* the name used as the base for naming components of the join including any repartition topics
*
* @param <K> key type
* @param <VLeft> left value type
* @param <VRight> right value type
*
* @return new {@code Joined} instance configured with the name
*/
public static <K, V, VO> Joined<K, V, VO> as(final String name) {
public static <K, VLeft, VRight> Joined<K, VLeft, VRight> as(final String name) {
return new Joined<>(null, null, null, name, null);
}
@ -182,46 +206,53 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
* Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
* key serde as defined in config
*
* @param keySerde the key serde to use. If null the default key serde from config will be used
* @param keySerde
* the key serde to use. If null the default key serde from config will be used
*
* @return new {@code Joined} instance configured with the {@code name}
*/
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public Joined<K, VLeft, VRight> withKeySerde(final Serde<K> keySerde) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
/**
* Set the value {@link Serde} to be used. Null values are accepted and will be replaced by the default
* value serde as defined in config
*
* @param valueSerde the value serde to use. If null the default value serde from config will be used
* @param leftValueSerde
* the left value serde to use. If null the default value serde from config will be used
*
* @return new {@code Joined} instance configured with the {@code valueSerde}
*/
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public Joined<K, VLeft, VRight> withValueSerde(final Serde<VLeft> leftValueSerde) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
/**
* Set the otherValue {@link Serde} to be used. Null values are accepted and will be replaced by the default
* value serde as defined in config
*
* @param otherValueSerde the otherValue serde to use. If null the default value serde from config will be used
* @param rightValueSerde
* the right value serde to use. If null the default value serde from config will be used
*
* @return new {@code Joined} instance configured with the {@code valueSerde}
*/
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public Joined<K, VLeft, VRight> withOtherValueSerde(final Serde<VRight> rightValueSerde) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
/**
* Set the base name used for all components of the join, this may include any repartition topics
* created to complete the join.
*
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param name
* the name used as the base for naming components of the join including any repartition topics
*
* @return new {@code Joined} instance configured with the {@code name}
*/
@Override
public Joined<K, V, VO> withName(final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public Joined<K, VLeft, VRight> withName(final String name) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
/**
@ -231,12 +262,13 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
* result in a null join. Long gaps in stream side arriving records will cause
* records to be delayed in processing.
*
* @param gracePeriod
* the duration of the grace period. Must be less than the joining table's history retention.
*
* @param gracePeriod the duration of the grace period. Must be less than the joining table's history retention.
* @return new {@code Joined} instance configured with the gracePeriod
*/
public Joined<K, V, VO> withGracePeriod(final Duration gracePeriod) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
public Joined<K, VLeft, VRight> withGracePeriod(final Duration gracePeriod) {
return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod);
}
public Duration gracePeriod() {
@ -247,11 +279,11 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
public Serde<VLeft> valueSerde() {
return leftValueSerde;
}
public Serde<VO> otherValueSerde() {
return otherValueSerde;
public Serde<VRight> otherValueSerde() {
return rightValueSerde;
}
}

View File

@ -19,22 +19,28 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Joined;
public class JoinedInternal<K, V, VO> extends Joined<K, V, VO> {
import java.time.Duration;
JoinedInternal(final Joined<K, V, VO> joined) {
public class JoinedInternal<K, VLeft, VRight> extends Joined<K, VLeft, VRight> {
JoinedInternal(final Joined<K, VLeft, VRight> joined) {
super(joined);
}
public Duration gracePeriod() {
return gracePeriod;
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
public Serde<VLeft> leftValueSerde() {
return leftValueSerde;
}
public Serde<VO> otherValueSerde() {
return otherValueSerde;
public Serde<VRight> rightValueSerde() {
return rightValueSerde;
}
public String name() {

View File

@ -1079,12 +1079,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name,
joined.keySerde(),
joined.valueSerde()
joinedInternal.keySerde(),
joinedInternal.leftValueSerde()
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joined, false);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false);
} else {
return doStreamTableJoin(table, joiner, joined, false);
return doStreamTableJoin(table, joiner, joinedInternal, false);
}
}
@ -1122,12 +1122,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name,
joined.keySerde(),
joined.valueSerde()
joinedInternal.keySerde(),
joinedInternal.leftValueSerde()
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joined, true);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true);
} else {
return doStreamTableJoin(table, joiner, joined, true);
return doStreamTableJoin(table, joiner, joinedInternal, true);
}
}
@ -1232,27 +1232,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@SuppressWarnings("unchecked")
private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined,
final JoinedInternal<K, V, VO> joinedInternal,
final boolean leftJoin) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
Optional<String> bufferStoreName = Optional.empty();
if (joined.gracePeriod() != null) {
if (joinedInternal.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
bufferStoreName = Optional.of(name + "-Buffer");
final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> storeBuilder =
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name);
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name);
builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
}
@ -1260,7 +1259,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
leftJoin,
Optional.ofNullable(joined.gracePeriod()),
Optional.ofNullable(joinedInternal.gracePeriod()),
bufferStoreName);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(processorSupplier, name);
@ -1269,7 +1268,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
processorParameters,
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
this.name,
joined.gracePeriod(),
joinedInternal.gracePeriod(),
bufferStoreName
);
@ -1281,7 +1280,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
// do not have serde for joined result
return new KStreamImpl<>(
name,
joined.keySerde() != null ? joined.keySerde() : keySerde,
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
null,
allSourceNodes,
false,