diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index ad866145a3a..d1c8623f298 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -2117,12 +2118,38 @@ public interface KTable { * @param the key type of the other {@code KTable} * @param the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} + * + * @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined)} instead. */ + @Deprecated KTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named); + /** + * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join, + * using the {@link TableJoined} instance for optional configurations including + * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning, + * and also the base name for components of the join. + *

+ * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. + * + * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * result is null, the update is ignored as invalid. + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores + * @param the value type of the result {@code KTable} + * @param the key type of the other {@code KTable} + * @param the value type of the other {@code KTable} + * @return a {@code KTable} that contains the result of joining this table with {@code other} + */ + KTable join(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined); + /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join. *

@@ -2160,13 +2187,42 @@ public interface KTable { * @param the key type of the other {@code KTable} * @param the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} + * + * @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead. */ + @Deprecated KTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized); + /** + * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join, + * using the {@link TableJoined} instance for optional configurations including + * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning, + * and also the base name for components of the join. + *

+ * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. + * + * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * result is null, the update is ignored as invalid. + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @param the value type of the result {@code KTable} + * @param the key type of the other {@code KTable} + * @param the value type of the other {@code KTable} + * @return a {@code KTable} that contains the result of joining this table with {@code other} + */ + KTable join(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined, + final Materialized> materialized); + /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. *

@@ -2199,12 +2255,38 @@ public interface KTable { * @param the key type of the other {@code KTable} * @param the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} + * + * @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined)} instead. */ + @Deprecated KTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named); + /** + * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join, + * using the {@link TableJoined} instance for optional configurations including + * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning, + * and also the base name for components of the join. + *

+ * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. + * + * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the + * result is null, the update is ignored as invalid. + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores + * @param the value type of the result {@code KTable} + * @param the key type of the other {@code KTable} + * @param the value type of the other {@code KTable} + * @return a {@code KTable} that contains the result of joining this table with {@code other} + */ + KTable leftJoin(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined); + /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. *

@@ -2242,13 +2324,42 @@ public interface KTable { * @param the key type of the other {@code KTable} * @param the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} + * + * @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead. */ + @Deprecated KTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized); + /** + * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join, + * using the {@link TableJoined} instance for optional configurations including + * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning, + * and also the base name for components of the join. + *

+ * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. + * + * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the + * result is null, the update is ignored as invalid. + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @param the value type of the result {@code KTable} + * @param the key type of the other {@code KTable} + * @param the value type of the other {@code KTable} + * @return a {@code KTable} that contains the result of joining this table with {@code other} + */ + KTable leftJoin(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined, + final Materialized> materialized); + /** * Get the name of the local state store used that can be used to query this {@code KTable}. * diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java new file mode 100644 index 00000000000..70a3630fd07 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.StreamPartitioner; + +import java.util.function.Function; + +/** + * The {@code TableJoined} class represents optional parameters that can be passed to + * {@link KTable#join(KTable, Function, ValueJoiner, TableJoined) KTable#join(KTable,Function,...)} and + * {@link KTable#leftJoin(KTable, Function, ValueJoiner, TableJoined) KTable#leftJoin(KTable,Function,...)} + * operations, for foreign key joins. + * @param this key type ; key type for the left (primary) table + * @param other key type ; key type for the right (foreign key) table + */ +public class TableJoined implements NamedOperation> { + + protected final StreamPartitioner partitioner; + protected final StreamPartitioner otherPartitioner; + protected final String name; + + private TableJoined(final StreamPartitioner partitioner, + final StreamPartitioner otherPartitioner, + final String name) { + this.partitioner = partitioner; + this.otherPartitioner = otherPartitioner; + this.name = name; + } + + protected TableJoined(final TableJoined tableJoined) { + this(tableJoined.partitioner, tableJoined.otherPartitioner, tableJoined.name); + } + + /** + * Create an instance of {@code TableJoined} with partitioner and otherPartitioner {@link StreamPartitioner} instances. + * {@code null} values are accepted and will result in the default partitioner being used. + * + * @param partitioner a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary) + * table of the foreign key join. Specifying this option does not repartition or otherwise + * affect the source table; rather, this option informs the foreign key join on how internal + * topics should be partitioned in order to be co-partitioned with the left join table. + * The partitioning strategy must depend only on the message key and not the message value, + * else the source table is not supported with foreign key joins. This option may be left + * {@code null} if the source table uses the default partitioner. + * @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign + * key) table of the foreign key join. Specifying this option does not repartition or otherwise + * affect the source table; rather, this option informs the foreign key join on how internal + * topics should be partitioned in order to be co-partitioned with the right join table. + * The partitioning strategy must depend only on the message key and not the message value, + * else the source table is not supported with foreign key joins. This option may be left + * {@code null} if the source table uses the default partitioner. + * @param this key type ; key type for the left (primary) table + * @param other key type ; key type for the right (foreign key) table + * @return new {@code TableJoined} instance with the provided partitioners + */ + public static TableJoined with(final StreamPartitioner partitioner, + final StreamPartitioner otherPartitioner) { + return new TableJoined<>(partitioner, otherPartitioner, null); + } + + /** + * Create an instance of {@code TableJoined} with base name for all components of the join, including internal topics + * created to complete the join. + * + * @param name the name used as the base for naming components of the join including internal topics + * @param this key type ; key type for the left (primary) table + * @param other key type ; key type for the right (foreign key) table + * @return new {@code TableJoined} instance configured with the {@code name} + * + */ + public static TableJoined as(final String name) { + return new TableJoined<>(null, null, name); + } + + /** + * Set the custom {@link StreamPartitioner} to be used as part of computing the join. + * {@code null} values are accepted and will result in the default partitioner being used. + * + * @param partitioner a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary) + * table of the foreign key join. Specifying this option does not repartition or otherwise + * affect the source table; rather, this option informs the foreign key join on how internal + * topics should be partitioned in order to be co-partitioned with the left join table. + * The partitioning strategy must depend only on the message key and not the message value, + * else the source table is not supported with foreign key joins. This option may be left + * {@code null} if the source table uses the default partitioner. + * @return new {@code TableJoined} instance configured with the {@code partitioner} + */ + public TableJoined withPartitioner(final StreamPartitioner partitioner) { + return new TableJoined<>(partitioner, otherPartitioner, name); + } + + /** + * Set the custom other {@link StreamPartitioner} to be used as part of computing the join. + * {@code null} values are accepted and will result in the default partitioner being used. + * + * @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign + * key) table of the foreign key join. Specifying this option does not repartition or otherwise + * affect the source table; rather, this option informs the foreign key join on how internal + * topics should be partitioned in order to be co-partitioned with the right join table. + * The partitioning strategy must depend only on the message key and not the message value, + * else the source table is not supported with foreign key joins. This option may be left + * {@code null} if the source table uses the default partitioner. + * @return new {@code TableJoined} instance configured with the {@code otherPartitioner} + */ + public TableJoined withOtherPartitioner(final StreamPartitioner otherPartitioner) { + return new TableJoined<>(partitioner, otherPartitioner, name); + } + + /** + * Set the base name used for all components of the join, including internal topics + * created to complete the join. + * + * @param name the name used as the base for naming components of the join including internal topics + * @return new {@code TableJoined} instance configured with the {@code name} + */ + @Override + public TableJoined withName(final String name) { + return new TableJoined<>(partitioner, otherPartitioner, name); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6f73044e19e..8c2fb69a423 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; @@ -58,6 +59,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier; import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed; import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; @@ -889,12 +891,13 @@ public class KTableImpl extends AbstractStream implements KTable< other, foreignKeyExtractor, joiner, - NamedInternal.empty(), + TableJoined.with(null, null), Materialized.with(null, null), false ); } + @SuppressWarnings("deprecation") @Override public KTable join(final KTable other, final Function foreignKeyExtractor, @@ -904,7 +907,22 @@ public class KTableImpl extends AbstractStream implements KTable< other, foreignKeyExtractor, joiner, - named, + TableJoined.as(new NamedInternal(named).name()), + Materialized.with(null, null), + false + ); + } + + @Override + public KTable join(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined) { + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + tableJoined, Materialized.with(null, null), false ); @@ -915,16 +933,40 @@ public class KTableImpl extends AbstractStream implements KTable< final Function foreignKeyExtractor, final ValueJoiner joiner, final Materialized> materialized) { - return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, false); + return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, false); } + @SuppressWarnings("deprecation") @Override public KTable join(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, false); + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + TableJoined.as(new NamedInternal(named).name()), + materialized, + false + ); + } + + @Override + public KTable join(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined, + final Materialized> materialized) { + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + tableJoined, + materialized, + false + ); } @Override @@ -935,12 +977,13 @@ public class KTableImpl extends AbstractStream implements KTable< other, foreignKeyExtractor, joiner, - NamedInternal.empty(), + TableJoined.with(null, null), Materialized.with(null, null), true ); } + @SuppressWarnings("deprecation") @Override public KTable leftJoin(final KTable other, final Function foreignKeyExtractor, @@ -950,19 +993,56 @@ public class KTableImpl extends AbstractStream implements KTable< other, foreignKeyExtractor, joiner, - named, + TableJoined.as(new NamedInternal(named).name()), Materialized.with(null, null), true ); } + @Override + public KTable leftJoin(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined) { + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + tableJoined, + Materialized.with(null, null), + true + ); + } + + @SuppressWarnings("deprecation") @Override public KTable leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final Named named, final Materialized> materialized) { - return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, true); + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + TableJoined.as(new NamedInternal(named).name()), + materialized, + true); + } + + @Override + public KTable leftJoin(final KTable other, + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final TableJoined tableJoined, + final Materialized> materialized) { + return doJoinOnForeignKey( + other, + foreignKeyExtractor, + joiner, + tableJoined, + materialized, + true); } @Override @@ -970,20 +1050,20 @@ public class KTableImpl extends AbstractStream implements KTable< final Function foreignKeyExtractor, final ValueJoiner joiner, final Materialized> materialized) { - return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, true); + return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true); } @SuppressWarnings("unchecked") private KTable doJoinOnForeignKey(final KTable foreignKeyTable, final Function foreignKeyExtractor, final ValueJoiner joiner, - final Named joinName, + final TableJoined tableJoined, final Materialized> materialized, final boolean leftJoin) { Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null"); Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joinName, "joinName can't be null"); + Objects.requireNonNull(tableJoined, "tableJoined can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values, @@ -995,7 +1075,8 @@ public class KTableImpl extends AbstractStream implements KTable< //This occurs whenever the extracted foreignKey changes values. enableSendingOldValues(true); - final NamedInternal renamed = new NamedInternal(joinName); + final TableJoinedInternal tableJoinedInternal = new TableJoinedInternal<>(tableJoined); + final NamedInternal renamed = new NamedInternal(tableJoinedInternal.name()); final String subscriptionTopicName = renamed.suffixWithOrElseGet( "-subscription-registration", @@ -1045,10 +1126,16 @@ public class KTableImpl extends AbstractStream implements KTable< builder.addGraphNode(graphNode, subscriptionNode); + final StreamPartitioner> subscriptionSinkPartitioner = + tableJoinedInternal.otherPartitioner() == null + ? null + : (topic, key, val, numPartitions) -> + tableJoinedInternal.otherPartitioner().partition(topic, key, null, numPartitions); + final StreamSinkNode> subscriptionSink = new StreamSinkNode<>( renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME), new StaticTopicNameExtractor<>(subscriptionTopicName), - new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde)) + new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner)) ); builder.addGraphNode(subscriptionNode, subscriptionSink); @@ -1115,11 +1202,17 @@ public class KTableImpl extends AbstractStream implements KTable< final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX; builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty()); + final StreamPartitioner> foreignResponseSinkPartitioner = + tableJoinedInternal.partitioner() == null + ? null + : (topic, key, val, numPartitions) -> + tableJoinedInternal.partitioner().partition(topic, key, null, numPartitions); + final StreamSinkNode> foreignResponseSink = new StreamSinkNode<>( renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME), new StaticTopicNameExtractor<>(finalRepartitionTopicName), - new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde)) + new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner)) ); builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink); builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java new file mode 100644 index 00000000000..fe165528814 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.TableJoined; +import org.apache.kafka.streams.processor.StreamPartitioner; + +public class TableJoinedInternal extends TableJoined { + + TableJoinedInternal(final TableJoined tableJoined) { + super(tableJoined); + } + + public StreamPartitioner partitioner() { + return partitioner; + } + + public StreamPartitioner otherPartitioner() { + return otherPartitioner; + } + + public String name() { + return name; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java index 7b1ec66bf6d..f0114b166d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java @@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; @@ -69,9 +68,6 @@ public class ForeignJoinSubscriptionProcessorSupplier implements org. store = internalProcessorContext.getStateStore(storeBuilder); } - /** - * @throws StreamsException if key is null - */ @Override public void process(final KO key, final Change value) { // if the key is null, we do not need proceed aggregating diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java new file mode 100644 index 00000000000..c83bbaee62d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import static java.time.Duration.ofSeconds; +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Repartitioned; +import org.apache.kafka.streams.kstream.TableJoined; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import kafka.utils.MockTime; + +@Category({IntegrationTest.class}) +public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { + private final static int NUM_BROKERS = 1; + + public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final static MockTime MOCK_TIME = CLUSTER.time; + private final static String TABLE_1 = "table1"; + private final static String TABLE_2 = "table2"; + private final static String OUTPUT = "output-"; + private final Properties streamsConfig = getStreamsConfig(); + private final Properties streamsConfigTwo = getStreamsConfig(); + private final Properties streamsConfigThree = getStreamsConfig(); + private KafkaStreams streams; + private KafkaStreams streamsTwo; + private KafkaStreams streamsThree; + private final static Properties CONSUMER_CONFIG = new Properties(); + + private final static Properties PRODUCER_CONFIG_1 = new Properties(); + private final static Properties PRODUCER_CONFIG_2 = new Properties(); + + @BeforeClass + public static void startCluster() throws IOException, InterruptedException { + CLUSTER.start(); + //Use multiple partitions to ensure distribution of keys. + + CLUSTER.createTopic(TABLE_1, 4, 1); + CLUSTER.createTopic(TABLE_2, 4, 1); + CLUSTER.createTopic(OUTPUT, 4, 1); + + PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + final List> table1 = asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-2", "ID123-A2"), + new KeyValue<>("ID123-3", "ID123-A3"), + new KeyValue<>("ID123-4", "ID123-A4") + ); + + final List> table2 = asList( + new KeyValue<>("ID123", "BBB") + ); + + IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME); + IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME); + + CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer"); + CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void before() throws IOException { + final String stateDirBasePath = TestUtils.tempDirectory().getPath(); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1"); + streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2"); + streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3"); + } + + @After + public void after() throws IOException { + if (streams != null) { + streams.close(); + streams = null; + } + if (streamsTwo != null) { + streamsTwo.close(); + streamsTwo = null; + } + if (streamsThree != null) { + streamsThree.close(); + streamsThree = null; + } + IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree)); + } + + @Test + public void shouldInnerJoinMultiPartitionQueryable() throws Exception { + final Set> expectedOne = new HashSet<>(); + expectedOne.add(new KeyValue<>("ID123-1", "value1=ID123-A1,value2=BBB")); + expectedOne.add(new KeyValue<>("ID123-2", "value1=ID123-A2,value2=BBB")); + expectedOne.add(new KeyValue<>("ID123-3", "value1=ID123-A3,value2=BBB")); + expectedOne.add(new KeyValue<>("ID123-4", "value1=ID123-A4,value2=BBB")); + + verifyKTableKTableJoin(expectedOne); + } + + private void verifyKTableKTableJoin(final Set> expectedResult) throws Exception { + final String innerJoinType = "INNER"; + final String queryableName = innerJoinType + "-store1"; + + streams = prepareTopology(queryableName, streamsConfig); + streamsTwo = prepareTopology(queryableName, streamsConfigTwo); + streamsThree = prepareTopology(queryableName, streamsConfigThree); + + final List kafkaStreamsList = asList(streams, streamsTwo, streamsThree); + startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120)); + + final Set> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived( + CONSUMER_CONFIG, + OUTPUT, + expectedResult.size())); + + assertEquals(expectedResult, result); + } + + private static Properties getStreamsConfig() { + final Properties streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner"); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + + return streamsConfig; + } + + private static KafkaStreams prepareTopology(final String queryableName, final Properties streamsConfig) { + + final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); + final StreamsBuilder builder = new StreamsBuilder(); + + final KTable table1 = builder.stream(TABLE_1, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))) + .repartition(repartitionA()) + .toTable(Named.as("table.a")); + + final KTable table2 = builder + .stream(TABLE_2, + Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))) + .repartition(repartitionB()) + .toTable(Named.as("table.b")); + + final Materialized> materialized; + if (queryableName != null) { + materialized = Materialized.>as(queryableName) + .withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true)) + .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) + .withCachingDisabled(); + } else { + throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store"); + } + + final ValueJoiner joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2; + + final TableJoined tableJoined = TableJoined.with( + (topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions, + (topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions + ); + + table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized) + .toStream() + .to(OUTPUT, + Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), + serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))); + + return new KafkaStreams(builder.build(streamsConfig), streamsConfig); + } + + private static Repartitioned repartitionA() { + final Repartitioned repartitioned = Repartitioned.as("a"); + return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions) + .withNumberOfPartitions(4); + } + + private static Repartitioned repartitionB() { + final Repartitioned repartitioned = Repartitioned.as("b"); + return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions) + .withNumberOfPartitions(4); + } + + private static String getKeyB(final String value) { + return value.substring(0, value.indexOf("-")); + } + +} \ No newline at end of file diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index 892f39eac76..3a405b68a77 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala package kstream import org.apache.kafka.common.utils.Bytes -import org.apache.kafka.streams.kstream.{ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ} +import org.apache.kafka.streams.kstream.{TableJoined, ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ} import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ FunctionFromFunction, KeyValueMapperFromFunction, @@ -656,6 +656,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ + @deprecated("Use join(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1") def join[VR, KO, VO]( other: KTable[KO, VO], keyExtractor: Function[V, KO], @@ -665,6 +666,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) { ): KTable[K, VR] = new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized)) + /** + * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this + * table are joined according to the result of keyExtractor on the other KTable. + * + * @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor + * @param keyExtractor a function that extracts the foreign key from this table's value + * @param joiner a function that computes the join result for a pair of matching records + * @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores + * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] + * should be materialized. + * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, + * one for each matched record-pair with the same key + */ + def join[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + tableJoined: TableJoined[K, KO], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = + new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized)) + /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this * table are joined according to the result of keyExtractor on the other KTable. @@ -698,6 +721,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ + @deprecated("Use leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1") def leftJoin[VR, KO, VO]( other: KTable[KO, VO], keyExtractor: Function[V, KO], @@ -707,6 +731,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) { ): KTable[K, VR] = new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized)) + /** + * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this + * table are joined according to the result of keyExtractor on the other KTable. + * + * @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor + * @param keyExtractor a function that extracts the foreign key from this table's value + * @param joiner a function that computes the join result for a pair of matching records + * @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores + * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] + * should be materialized. + * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, + * one for each matched record-pair with the same key + */ + def leftJoin[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + tableJoined: TableJoined[K, KO], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = + new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized)) + /** * Get the name of the local state store used that can be used to query this [[KTable]]. * diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 937951fc00f..c541650649e 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -413,7 +413,7 @@ public abstract class TopologyTestDriverTest { .count(Materialized.as(firstTableName)); builder.table(SOURCE_TOPIC_2, Materialized.as(secondTableName)) - .join(t1, v -> v, (v1, v2) -> v2, Named.as(joinName)); + .join(t1, v -> v, (v1, v2) -> v2, TableJoined.as(joinName)); return builder.build(config); }