From 1e14b0c964091121a5116cae18d9494a5d07bb5d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 21 Aug 2024 14:31:22 -0700 Subject: [PATCH] MINOR: Cleanup Joined class (#14551) Code cleanup and JavaDocs fixed, plus add missing getters to JoinedInternal. Reviewers: Lucas Brutschy --- .../apache/kafka/streams/kstream/Joined.java | 224 ++++++++++-------- .../kstream/internals/JoinedInternal.java | 18 +- .../kstream/internals/KStreamImpl.java | 29 ++- 3 files changed, 154 insertions(+), 117 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java index 55dff2428f8..2978f943f31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -24,47 +24,56 @@ import java.time.Duration; * The {@code Joined} class represents optional params that can be passed to * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and * {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} operations. + * + * @param type of record key + * @param type of left record value + * @param type of right record value */ -public class Joined implements NamedOperation> { +public class Joined implements NamedOperation> { protected final Serde keySerde; - protected final Serde valueSerde; - protected final Serde otherValueSerde; + protected final Serde leftValueSerde; + protected final Serde rightValueSerde; protected final String name; protected final Duration gracePeriod; private Joined(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde, + final Serde leftValueSerde, + final Serde rightValueSerde, final String name, final Duration gracePeriod) { this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.otherValueSerde = otherValueSerde; + this.leftValueSerde = leftValueSerde; + this.rightValueSerde = rightValueSerde; this.name = name; this.gracePeriod = gracePeriod; } - protected Joined(final Joined joined) { - this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name, joined.gracePeriod); + protected Joined(final Joined joined) { + this(joined.keySerde, joined.leftValueSerde, joined.rightValueSerde, joined.name, joined.gracePeriod); } /** * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances. * {@code null} values are accepted and will be replaced by the default serdes as defined in config. * - * @param keySerde the key serde to use. If {@code null} the default key serde from config will be used - * @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used - * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used - * @param key type - * @param value type - * @param other value type + * @param keySerde + * the key serde to use. If {@code null} the default key serde from config will be used + * @param leftValueSerde + * the value serde to use. If {@code null} the default value serde from config will be used + * @param rightValueSerde + * the otherValue serde to use. If {@code null} the default value serde from config will be used + * + * @param key type + * @param left value type + * @param right value type + * * @return new {@code Joined} instance with the provided serdes */ - public static Joined with(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null); + public static Joined with(final Serde keySerde, + final Serde leftValueSerde, + final Serde rightValueSerde) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, null, null); } /** @@ -72,24 +81,26 @@ public class Joined implements NamedOperation> { * {@code null} values are accepted and will be replaced by the default serdes as defined in * config. * - * @param keySerde the key serde to use. If {@code null} the default key serde from config will be - * used - * @param valueSerde the value serde to use. If {@code null} the default value serde from config - * will be used - * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde - * from config will be used - * @param name the name used as the base for naming components of the join including any - * repartition topics + * @param keySerde + * the key serde to use. If {@code null} the default key serde from config will be used + * @param leftValueSerde + * the left value serde to use. If {@code null} the default value serde from config will be used + * @param rightValueSerde + * the right value serde to use. If {@code null} the default value serde from config will be used + * @param name + * the name used as the base for naming components of the join including any repartition topics + * * @param key type - * @param value type - * @param other value type + * @param left value type + * @param right value type + * * @return new {@code Joined} instance with the provided serdes */ - public static Joined with(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde, - final String name) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null); + public static Joined with(final Serde keySerde, + final Serde leftValueSerde, + final Serde rightValueSerde, + final String name) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, null); } /** @@ -97,39 +108,45 @@ public class Joined implements NamedOperation> { * {@code null} values are accepted and will be replaced by the default serdes as defined in * config. * - * @param keySerde the key serde to use. If {@code null} the default key serde from config will be - * used - * @param valueSerde the value serde to use. If {@code null} the default value serde from config - * will be used - * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde - * from config will be used - * @param name the name used as the base for naming components of the join including any - * repartition topics - * @param gracePeriod stream buffer time + * @param keySerde + * the key serde to use. If {@code null} the default key serde from config will be used + * @param leftValueSerde + * the left value serde to use. If {@code null} the default value serde from config will be used + * @param rightValueSerde + * the right value serde to use. If {@code null} the default value serde from config will be used + * @param name + * the name used as the base for naming components of the join including any repartition topics + * @param gracePeriod + * stream buffer time + * * @param key type - * @param value type - * @param other value type + * @param value value type + * @param right value type + * * @return new {@code Joined} instance with the provided serdes */ - public static Joined with(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde, - final String name, - final Duration gracePeriod) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public static Joined with(final Serde keySerde, + final Serde leftValueSerde, + final Serde rightValueSerde, + final String name, + final Duration gracePeriod) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } /** * Create an instance of {@code Joined} with a key {@link Serde}. * {@code null} values are accepted and will be replaced by the default key serde as defined in config. * - * @param keySerde the key serde to use. If {@code null} the default key serde from config will be used - * @param key type - * @param value type - * @param other value type + * @param keySerde + * the key serde to use. If {@code null} the default key serde from config will be used + * + * @param key type + * @param value value type + * @param right value type + * * @return new {@code Joined} instance configured with the keySerde */ - public static Joined keySerde(final Serde keySerde) { + public static Joined keySerde(final Serde keySerde) { return new Joined<>(keySerde, null, null, null, null); } @@ -137,44 +154,51 @@ public class Joined implements NamedOperation> { * Create an instance of {@code Joined} with a value {@link Serde}. * {@code null} values are accepted and will be replaced by the default value serde as defined in config. * - * @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used - * @param key type - * @param value type - * @param other value type + * @param leftValueSerde + * the left value serde to use. If {@code null} the default value serde from config will be used + * + * @param key type + * @param left value type + * @param right value type + * * @return new {@code Joined} instance configured with the valueSerde */ - public static Joined valueSerde(final Serde valueSerde) { - return new Joined<>(null, valueSerde, null, null, null); + public static Joined valueSerde(final Serde leftValueSerde) { + return new Joined<>(null, leftValueSerde, null, null, null); } /** - * Create an instance of {@code Joined} with an other value {@link Serde}. + * Create an instance of {@code Joined} with aother value {@link Serde}. * {@code null} values are accepted and will be replaced by the default value serde as defined in config. * - * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used - * @param key type - * @param value type - * @param other value type + * @param rightValueSerde + * the right value serde to use. If {@code null} the default value serde from config will be used + * + * @param key type + * @param value type + * @param right value type + * * @return new {@code Joined} instance configured with the otherValueSerde */ - public static Joined otherValueSerde(final Serde otherValueSerde) { - return new Joined<>(null, null, otherValueSerde, null, null); + public static Joined otherValueSerde(final Serde rightValueSerde) { + return new Joined<>(null, null, rightValueSerde, null, null); } /** * Create an instance of {@code Joined} with base name for all components of the join, this may * include any repartition topics created to complete the join. * - * @param name the name used as the base for naming components of the join including any - * repartition topics - * @param key type - * @param value type - * @param other value type - * @return new {@code Joined} instance configured with the name + * @param name + * the name used as the base for naming components of the join including any repartition topics * + * @param key type + * @param left value type + * @param right value type + * + * @return new {@code Joined} instance configured with the name */ - public static Joined as(final String name) { + public static Joined as(final String name) { return new Joined<>(null, null, null, name, null); } @@ -182,46 +206,53 @@ public class Joined implements NamedOperation> { * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default * key serde as defined in config * - * @param keySerde the key serde to use. If null the default key serde from config will be used + * @param keySerde + * the key serde to use. If null the default key serde from config will be used + * * @return new {@code Joined} instance configured with the {@code name} */ - public Joined withKeySerde(final Serde keySerde) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public Joined withKeySerde(final Serde keySerde) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } /** * Set the value {@link Serde} to be used. Null values are accepted and will be replaced by the default * value serde as defined in config * - * @param valueSerde the value serde to use. If null the default value serde from config will be used + * @param leftValueSerde + * the left value serde to use. If null the default value serde from config will be used + * * @return new {@code Joined} instance configured with the {@code valueSerde} */ - public Joined withValueSerde(final Serde valueSerde) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public Joined withValueSerde(final Serde leftValueSerde) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } /** * Set the otherValue {@link Serde} to be used. Null values are accepted and will be replaced by the default * value serde as defined in config * - * @param otherValueSerde the otherValue serde to use. If null the default value serde from config will be used + * @param rightValueSerde + * the right value serde to use. If null the default value serde from config will be used + * * @return new {@code Joined} instance configured with the {@code valueSerde} */ - public Joined withOtherValueSerde(final Serde otherValueSerde) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public Joined withOtherValueSerde(final Serde rightValueSerde) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } /** * Set the base name used for all components of the join, this may include any repartition topics * created to complete the join. * - * @param name the name used as the base for naming components of the join including any - * repartition topics + * @param name + * the name used as the base for naming components of the join including any repartition topics + * * @return new {@code Joined} instance configured with the {@code name} */ @Override - public Joined withName(final String name) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public Joined withName(final String name) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } /** @@ -231,12 +262,13 @@ public class Joined implements NamedOperation> { * result in a null join. Long gaps in stream side arriving records will cause * records to be delayed in processing. * + * @param gracePeriod + * the duration of the grace period. Must be less than the joining table's history retention. * - * @param gracePeriod the duration of the grace period. Must be less than the joining table's history retention. * @return new {@code Joined} instance configured with the gracePeriod */ - public Joined withGracePeriod(final Duration gracePeriod) { - return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod); + public Joined withGracePeriod(final Duration gracePeriod) { + return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, gracePeriod); } public Duration gracePeriod() { @@ -247,11 +279,11 @@ public class Joined implements NamedOperation> { return keySerde; } - public Serde valueSerde() { - return valueSerde; + public Serde valueSerde() { + return leftValueSerde; } - public Serde otherValueSerde() { - return otherValueSerde; + public Serde otherValueSerde() { + return rightValueSerde; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java index eb5884042cd..e01adb5ae4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java @@ -19,22 +19,28 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Joined; -public class JoinedInternal extends Joined { +import java.time.Duration; - JoinedInternal(final Joined joined) { +public class JoinedInternal extends Joined { + + JoinedInternal(final Joined joined) { super(joined); } + public Duration gracePeriod() { + return gracePeriod; + } + public Serde keySerde() { return keySerde; } - public Serde valueSerde() { - return valueSerde; + public Serde leftValueSerde() { + return leftValueSerde; } - public Serde otherValueSerde() { - return otherValueSerde; + public Serde rightValueSerde() { + return rightValueSerde; } public String name() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index e03ba1e19d1..556ff94c95a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1079,12 +1079,12 @@ public class KStreamImpl extends AbstractStream implements KStream thisStreamRepartitioned = repartitionForJoin( name != null ? name : this.name, - joined.keySerde(), - joined.valueSerde() + joinedInternal.keySerde(), + joinedInternal.leftValueSerde() ); - return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joined, false); + return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false); } else { - return doStreamTableJoin(table, joiner, joined, false); + return doStreamTableJoin(table, joiner, joinedInternal, false); } } @@ -1122,12 +1122,12 @@ public class KStreamImpl extends AbstractStream implements KStream thisStreamRepartitioned = repartitionForJoin( name != null ? name : this.name, - joined.keySerde(), - joined.valueSerde() + joinedInternal.keySerde(), + joinedInternal.leftValueSerde() ); - return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joined, true); + return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true); } else { - return doStreamTableJoin(table, joiner, joined, true); + return doStreamTableJoin(table, joiner, joinedInternal, true); } } @@ -1232,27 +1232,26 @@ public class KStreamImpl extends AbstractStream implements KStream KStream doStreamTableJoin(final KTable table, final ValueJoinerWithKey joiner, - final Joined joined, + final JoinedInternal joinedInternal, final boolean leftJoin) { Objects.requireNonNull(table, "table can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); final Set allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream) table)); - final JoinedInternal joinedInternal = new JoinedInternal<>(joined); final NamedInternal renamed = new NamedInternal(joinedInternal.name()); final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME); Optional bufferStoreName = Optional.empty(); - if (joined.gracePeriod() != null) { + if (joinedInternal.gracePeriod() != null) { if (!((KTableImpl) table).graphNode.isOutputVersioned().orElse(true)) { throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); } bufferStoreName = Optional.of(name + "-Buffer"); final RocksDBTimeOrderedKeyValueBuffer.Builder storeBuilder = - new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name); + new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name); builder.addStateStore(new StoreBuilderWrapper(storeBuilder)); } @@ -1260,7 +1259,7 @@ public class KStreamImpl extends AbstractStream implements KStream) table).valueGetterSupplier(), joiner, leftJoin, - Optional.ofNullable(joined.gracePeriod()), + Optional.ofNullable(joinedInternal.gracePeriod()), bufferStoreName); final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); @@ -1269,7 +1268,7 @@ public class KStreamImpl extends AbstractStream implements KStream) table).valueGetterSupplier().storeNames(), this.name, - joined.gracePeriod(), + joinedInternal.gracePeriod(), bufferStoreName ); @@ -1281,7 +1280,7 @@ public class KStreamImpl extends AbstractStream implements KStream( name, - joined.keySerde() != null ? joined.keySerde() : keySerde, + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, null, allSourceNodes, false,