KAFKA-13261: Add support for custom partitioners in foreign key joins (#11368)

Implements KIP-775.

Co-authored-by: Tomas Forsman <tomas-forsman@users.noreply.github.com>
This commit is contained in:
Victoria Xia 2021-11-03 10:55:24 -07:00 committed by GitHub
parent 22d056c9b7
commit 01e6a6ebf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 695 additions and 20 deletions

View File

@ -26,6 +26,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -2117,12 +2118,38 @@ public interface KTable<K, V> {
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
* <p>
@ -2160,13 +2187,42 @@ public interface KTable<K, V> {
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
@ -2199,12 +2255,38 @@ public interface KTable<K, V> {
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
* <p>
@ -2242,13 +2324,42 @@ public interface KTable<K, V> {
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*
* @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
*/
@Deprecated
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
* using the {@link TableJoined} instance for optional configurations including
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
* and also the base name for components of the join.
* <p>
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
* result is null, the update is ignored as invalid.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains the result of joining this table with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
*

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import java.util.function.Function;
/**
* The {@code TableJoined} class represents optional parameters that can be passed to
* {@link KTable#join(KTable, Function, ValueJoiner, TableJoined) KTable#join(KTable,Function,...)} and
* {@link KTable#leftJoin(KTable, Function, ValueJoiner, TableJoined) KTable#leftJoin(KTable,Function,...)}
* operations, for foreign key joins.
* @param <K> this key type ; key type for the left (primary) table
* @param <KO> other key type ; key type for the right (foreign key) table
*/
public class TableJoined<K, KO> implements NamedOperation<TableJoined<K, KO>> {
protected final StreamPartitioner<K, Void> partitioner;
protected final StreamPartitioner<KO, Void> otherPartitioner;
protected final String name;
private TableJoined(final StreamPartitioner<K, Void> partitioner,
final StreamPartitioner<KO, Void> otherPartitioner,
final String name) {
this.partitioner = partitioner;
this.otherPartitioner = otherPartitioner;
this.name = name;
}
protected TableJoined(final TableJoined<K, KO> tableJoined) {
this(tableJoined.partitioner, tableJoined.otherPartitioner, tableJoined.name);
}
/**
* Create an instance of {@code TableJoined} with partitioner and otherPartitioner {@link StreamPartitioner} instances.
* {@code null} values are accepted and will result in the default partitioner being used.
*
* @param partitioner a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary)
* table of the foreign key join. Specifying this option does not repartition or otherwise
* affect the source table; rather, this option informs the foreign key join on how internal
* topics should be partitioned in order to be co-partitioned with the left join table.
* The partitioning strategy must depend only on the message key and not the message value,
* else the source table is not supported with foreign key joins. This option may be left
* {@code null} if the source table uses the default partitioner.
* @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign
* key) table of the foreign key join. Specifying this option does not repartition or otherwise
* affect the source table; rather, this option informs the foreign key join on how internal
* topics should be partitioned in order to be co-partitioned with the right join table.
* The partitioning strategy must depend only on the message key and not the message value,
* else the source table is not supported with foreign key joins. This option may be left
* {@code null} if the source table uses the default partitioner.
* @param <K> this key type ; key type for the left (primary) table
* @param <KO> other key type ; key type for the right (foreign key) table
* @return new {@code TableJoined} instance with the provided partitioners
*/
public static <K, KO> TableJoined<K, KO> with(final StreamPartitioner<K, Void> partitioner,
final StreamPartitioner<KO, Void> otherPartitioner) {
return new TableJoined<>(partitioner, otherPartitioner, null);
}
/**
* Create an instance of {@code TableJoined} with base name for all components of the join, including internal topics
* created to complete the join.
*
* @param name the name used as the base for naming components of the join including internal topics
* @param <K> this key type ; key type for the left (primary) table
* @param <KO> other key type ; key type for the right (foreign key) table
* @return new {@code TableJoined} instance configured with the {@code name}
*
*/
public static <K, KO> TableJoined<K, KO> as(final String name) {
return new TableJoined<>(null, null, name);
}
/**
* Set the custom {@link StreamPartitioner} to be used as part of computing the join.
* {@code null} values are accepted and will result in the default partitioner being used.
*
* @param partitioner a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary)
* table of the foreign key join. Specifying this option does not repartition or otherwise
* affect the source table; rather, this option informs the foreign key join on how internal
* topics should be partitioned in order to be co-partitioned with the left join table.
* The partitioning strategy must depend only on the message key and not the message value,
* else the source table is not supported with foreign key joins. This option may be left
* {@code null} if the source table uses the default partitioner.
* @return new {@code TableJoined} instance configured with the {@code partitioner}
*/
public TableJoined<K, KO> withPartitioner(final StreamPartitioner<K, Void> partitioner) {
return new TableJoined<>(partitioner, otherPartitioner, name);
}
/**
* Set the custom other {@link StreamPartitioner} to be used as part of computing the join.
* {@code null} values are accepted and will result in the default partitioner being used.
*
* @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign
* key) table of the foreign key join. Specifying this option does not repartition or otherwise
* affect the source table; rather, this option informs the foreign key join on how internal
* topics should be partitioned in order to be co-partitioned with the right join table.
* The partitioning strategy must depend only on the message key and not the message value,
* else the source table is not supported with foreign key joins. This option may be left
* {@code null} if the source table uses the default partitioner.
* @return new {@code TableJoined} instance configured with the {@code otherPartitioner}
*/
public TableJoined<K, KO> withOtherPartitioner(final StreamPartitioner<KO, Void> otherPartitioner) {
return new TableJoined<>(partitioner, otherPartitioner, name);
}
/**
* Set the base name used for all components of the join, including internal topics
* created to complete the join.
*
* @param name the name used as the base for naming components of the join including internal topics
* @return new {@code TableJoined} instance configured with the {@code name}
*/
@Override
public TableJoined<K, KO> withName(final String name) {
return new TableJoined<>(partitioner, otherPartitioner, name);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@ -58,6 +59,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
@ -889,12 +891,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
other,
foreignKeyExtractor,
joiner,
NamedInternal.empty(),
TableJoined.with(null, null),
Materialized.with(null, null),
false
);
}
@SuppressWarnings("deprecation")
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
@ -904,7 +907,22 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
other,
foreignKeyExtractor,
joiner,
named,
TableJoined.as(new NamedInternal(named).name()),
Materialized.with(null, null),
false
);
}
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined) {
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
tableJoined,
Materialized.with(null, null),
false
);
@ -915,16 +933,40 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, false);
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, false);
}
@SuppressWarnings("deprecation")
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, false);
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
TableJoined.as(new NamedInternal(named).name()),
materialized,
false
);
}
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
tableJoined,
materialized,
false
);
}
@Override
@ -935,12 +977,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
other,
foreignKeyExtractor,
joiner,
NamedInternal.empty(),
TableJoined.with(null, null),
Materialized.with(null, null),
true
);
}
@SuppressWarnings("deprecation")
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
@ -950,19 +993,56 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
other,
foreignKeyExtractor,
joiner,
named,
TableJoined.as(new NamedInternal(named).name()),
Materialized.with(null, null),
true
);
}
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined) {
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
tableJoined,
Materialized.with(null, null),
true
);
}
@SuppressWarnings("deprecation")
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, true);
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
TableJoined.as(new NamedInternal(named).name()),
materialized,
true);
}
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(
other,
foreignKeyExtractor,
joiner,
tableJoined,
materialized,
true);
}
@Override
@ -970,20 +1050,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, true);
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true);
}
@SuppressWarnings("unchecked")
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named joinName,
final TableJoined<K, KO> tableJoined,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final boolean leftJoin) {
Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joinName, "joinName can't be null");
Objects.requireNonNull(tableJoined, "tableJoined can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
//Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
@ -995,7 +1075,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues(true);
final NamedInternal renamed = new NamedInternal(joinName);
final TableJoinedInternal<K, KO> tableJoinedInternal = new TableJoinedInternal<>(tableJoined);
final NamedInternal renamed = new NamedInternal(tableJoinedInternal.name());
final String subscriptionTopicName = renamed.suffixWithOrElseGet(
"-subscription-registration",
@ -1045,10 +1126,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(graphNode, subscriptionNode);
final StreamPartitioner<KO, SubscriptionWrapper<K>> subscriptionSinkPartitioner =
tableJoinedInternal.otherPartitioner() == null
? null
: (topic, key, val, numPartitions) ->
tableJoinedInternal.otherPartitioner().partition(topic, key, null, numPartitions);
final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(subscriptionTopicName),
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner))
);
builder.addGraphNode(subscriptionNode, subscriptionSink);
@ -1115,11 +1202,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
tableJoinedInternal.partitioner() == null
? null
: (topic, key, val, numPartitions) ->
tableJoinedInternal.partitioner().partition(topic, key, null, numPartitions);
final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(finalRepartitionTopicName),
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner))
);
builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class TableJoinedInternal<K, KO> extends TableJoined<K, KO> {
TableJoinedInternal(final TableJoined<K, KO> tableJoined) {
super(tableJoined);
}
public StreamPartitioner<K, Void> partitioner() {
return partitioner;
}
public StreamPartitioner<KO, Void> otherPartitioner() {
return otherPartitioner;
}
public String name() {
return name;
}
}

View File

@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -69,9 +68,6 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.
store = internalProcessorContext.getStateStore(storeBuilder);
}
/**
* @throws StreamsException if key is null
*/
@Override
public void process(final KO key, final Change<VO> value) {
// if the key is null, we do not need proceed aggregating

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import kafka.utils.MockTime;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
private final static String TABLE_2 = "table2";
private final static String OUTPUT = "output-";
private final Properties streamsConfig = getStreamsConfig();
private final Properties streamsConfigTwo = getStreamsConfig();
private final Properties streamsConfigThree = getStreamsConfig();
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
private final static Properties CONSUMER_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG_1 = new Properties();
private final static Properties PRODUCER_CONFIG_2 = new Properties();
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(TABLE_1, 4, 1);
CLUSTER.createTopic(TABLE_2, 4, 1);
CLUSTER.createTopic(OUTPUT, 4, 1);
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final List<KeyValue<String, String>> table1 = asList(
new KeyValue<>("ID123-1", "ID123-A1"),
new KeyValue<>("ID123-2", "ID123-A2"),
new KeyValue<>("ID123-3", "ID123-A3"),
new KeyValue<>("ID123-4", "ID123-A4")
);
final List<KeyValue<String, String>> table2 = asList(
new KeyValue<>("ID123", "BBB")
);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterClass
public static void closeCluster() {
CLUSTER.stop();
}
@Before
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}
@After
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
}
@Test
public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
final Set<KeyValue<String, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>("ID123-1", "value1=ID123-A1,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-2", "value1=ID123-A2,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-3", "value1=ID123-A3,value2=BBB"));
expectedOne.add(new KeyValue<>("ID123-4", "value1=ID123-A4,value2=BBB"));
verifyKTableKTableJoin(expectedOne);
}
private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
final String innerJoinType = "INNER";
final String queryableName = innerJoinType + "-store1";
streams = prepareTopology(queryableName, streamsConfig);
streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
streamsThree = prepareTopology(queryableName, streamsConfigThree);
final List<KafkaStreams> kafkaStreamsList = asList(streams, streamsTwo, streamsThree);
startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size()));
assertEquals(expectedResult, result);
}
private static Properties getStreamsConfig() {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return streamsConfig;
}
private static KafkaStreams prepareTopology(final String queryableName, final Properties streamsConfig) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> table1 = builder.stream(TABLE_1,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionA())
.toTable(Named.as("table.a"));
final KTable<String, String> table2 = builder
.stream(TABLE_2,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
.repartition(repartitionB())
.toTable(Named.as("table.b"));
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) {
materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true))
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final TableJoined<String, String> tableJoined = TableJoined.with(
(topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions,
(topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions
);
table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
.toStream()
.to(OUTPUT,
Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
private static Repartitioned<String, String> repartitionA() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("a");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions)
.withNumberOfPartitions(4);
}
private static Repartitioned<String, String> repartitionB() {
final Repartitioned<String, String> repartitioned = Repartitioned.as("b");
return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions)
.withNumberOfPartitions(4);
}
private static String getKeyB(final String value) {
return value.substring(0, value.indexOf("-"));
}
}

View File

@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ}
import org.apache.kafka.streams.kstream.{TableJoined, ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ}
import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
FunctionFromFunction,
KeyValueMapperFromFunction,
@ -656,6 +656,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
@deprecated("Use join(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1")
def join[VR, KO, VO](
other: KTable[KO, VO],
keyExtractor: Function[V, KO],
@ -665,6 +666,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
): KTable[K, VR] =
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records
* @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
def join[VR, KO, VO](
other: KTable[KO, VO],
keyExtractor: Function[V, KO],
joiner: ValueJoiner[V, VO, VR],
tableJoined: TableJoined[K, KO],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
): KTable[K, VR] =
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
@ -698,6 +721,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
@deprecated("Use leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1")
def leftJoin[VR, KO, VO](
other: KTable[KO, VO],
keyExtractor: Function[V, KO],
@ -707,6 +731,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
): KTable[K, VR] =
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records
* @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
def leftJoin[VR, KO, VO](
other: KTable[KO, VO],
keyExtractor: Function[V, KO],
joiner: ValueJoiner[V, VO, VR],
tableJoined: TableJoined[K, KO],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
): KTable[K, VR] =
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
/**
* Get the name of the local state store used that can be used to query this [[KTable]].
*

View File

@ -35,7 +35,7 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@ -413,7 +413,7 @@ public abstract class TopologyTestDriverTest {
.count(Materialized.as(firstTableName));
builder.table(SOURCE_TOPIC_2, Materialized.as(secondTableName))
.join(t1, v -> v, (v1, v2) -> v2, Named.as(joinName));
.join(t1, v -> v, (v1, v2) -> v2, TableJoined.as(joinName));
return builder.build(config);
}