mirror of https://github.com/apache/kafka.git
KAFKA-17893: Support record keys in the foreignKeyExtractor argument of KTable foreign join (#17756)
Currently, KTable foreign key joins only allow extracting the foreign key from the value of the source record. This forces users to duplicate data that might already exist in the key into the value when the foreign key needs to be derived from both the key and value. This leads to: - Data duplication - Additional storage overhead - Potential data inconsistency if the duplicated data gets out of sync - Less intuitive API when the foreign key is naturally derived from both key and value This change allows user to extract the foreign key from the key and value of the source record. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
a8cdbaf4b3
commit
c76fb5cb9b
|
@ -31,6 +31,7 @@ import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
|
|||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
|
@ -2111,6 +2112,24 @@ public interface KTable<K, V> {
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* result is null, the update is ignored as invalid.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
|
@ -2134,6 +2153,28 @@ public interface KTable<K, V> {
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
|
||||
* and also the base name for components of the join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* result is null, the update is ignored as invalid.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined);
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
|
||||
* <p>
|
||||
|
@ -2155,6 +2196,27 @@ public interface KTable<K, V> {
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* result is null, the update is ignored as invalid.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
|
||||
* should be materialized. Cannot be {@code null}
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
|
@ -2181,6 +2243,32 @@ public interface KTable<K, V> {
|
|||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
|
||||
* and also the base name for components of the join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* result is null, the update is ignored as invalid.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
|
||||
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
|
||||
* should be materialized. Cannot be {@code null}
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
|
||||
* <p>
|
||||
|
@ -2199,6 +2287,24 @@ public interface KTable<K, V> {
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* extract is null, then the right hand side of the result will be null.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains only those records that satisfy the given predicate
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
|
@ -2221,6 +2327,28 @@ public interface KTable<K, V> {
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
|
||||
* and also the base name for components of the join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* extract is null, then the right hand side of the result will be null.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
|
||||
* <p>
|
||||
|
@ -2242,6 +2370,27 @@ public interface KTable<K, V> {
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* extract is null, then the right hand side of the result will be null.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
|
||||
* should be materialized. Cannot be {@code null}
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
|
@ -2268,6 +2417,32 @@ public interface KTable<K, V> {
|
|||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
|
||||
* using the {@link TableJoined} instance for optional configurations including
|
||||
* {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
|
||||
* and also the base name for components of the join.
|
||||
* <p>
|
||||
* This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
|
||||
*
|
||||
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
|
||||
* @param foreignKeyExtractor a {@link BiFunction} that extracts the key (KO) from this table's key and value (K, V). If the
|
||||
* extract is null, then the right hand side of the result will be null.
|
||||
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
|
||||
* @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores
|
||||
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
|
||||
* should be materialized. Cannot be {@code null}
|
||||
* @param <VR> the value type of the result {@code KTable}
|
||||
* @param <KO> the key type of the other {@code KTable}
|
||||
* @param <VO> the value type of the other {@code KTable}
|
||||
* @return a {@code KTable} that contains the result of joining this table with {@code other}
|
||||
*/
|
||||
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
||||
|
||||
/**
|
||||
* Get the name of the local state store used that can be used to query this {@code KTable}.
|
||||
*
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
|
|||
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignKeyExtractor;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplier;
|
||||
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinProcessorSupplier;
|
||||
|
@ -86,6 +87,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -906,9 +908,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
TableJoined.with(null, null),
|
||||
Materialized.with(null, null),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
TableJoined.with(null, null),
|
||||
Materialized.with(null, null),
|
||||
|
@ -921,9 +939,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
Materialized.with(null, null),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
Materialized.with(null, null),
|
||||
|
@ -936,7 +971,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, false);
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(other, adaptedExtractor, joiner, TableJoined.with(null, null), materialized, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(other, adaptedExtractor, joiner, TableJoined.with(null, null), materialized, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -945,9 +990,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
materialized,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
materialized,
|
||||
|
@ -959,9 +1022,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
TableJoined.with(null, null),
|
||||
Materialized.with(null, null),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
TableJoined.with(null, null),
|
||||
Materialized.with(null, null),
|
||||
|
@ -974,9 +1053,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
Materialized.with(null, null),
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
Materialized.with(null, null),
|
||||
|
@ -990,9 +1086,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
foreignKeyExtractor,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
materialized,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(
|
||||
other,
|
||||
adaptedExtractor,
|
||||
joiner,
|
||||
tableJoined,
|
||||
materialized,
|
||||
|
@ -1004,7 +1117,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true);
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(other, adaptedExtractor, joiner, TableJoined.with(null, null), materialized, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
|
||||
final BiFunction<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
|
||||
final ForeignKeyExtractor<K, V, KO> adaptedExtractor = ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
|
||||
return doJoinOnForeignKey(other, adaptedExtractor, joiner, TableJoined.with(null, null), materialized, true);
|
||||
}
|
||||
|
||||
private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>> getPartition = maybeMulticastPartitions -> {
|
||||
|
@ -1020,7 +1143,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
|
||||
@SuppressWarnings({"unchecked", "deprecation"})
|
||||
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
|
||||
final Function<V, KO> foreignKeyExtractor,
|
||||
final ForeignKeyExtractor<K, V, KO> foreignKeyExtractor,
|
||||
final ValueJoiner<V, VO, VR> joiner,
|
||||
final TableJoined<K, KO> tableJoined,
|
||||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* An interface for extracting foreign keys from input records during foreign key joins in Kafka Streams.
|
||||
* This extractor is used to determine the key of the foreign table to join with based on the primary
|
||||
* table's record key and value.
|
||||
* <p>
|
||||
* The interface provides two factory methods:
|
||||
* <ul>
|
||||
* <li>{@link #fromFunction(Function)} - when the foreign key depends only on the value</li>
|
||||
* <li>{@link #fromBiFunction(BiFunction)} - when the foreign key depends on both key and value</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param <K> Type of primary table's key
|
||||
* @param <V> Type of primary table's value
|
||||
* @param <KO> Type of the foreign key to extract
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ForeignKeyExtractor<K, V, KO> {
|
||||
KO extract(K key, V value);
|
||||
|
||||
static <K, V, KO> ForeignKeyExtractor<K, V, KO> fromFunction(Function<V, KO> function) {
|
||||
return (key, value) -> function.apply(value);
|
||||
}
|
||||
|
||||
static <K, V, KO> ForeignKeyExtractor<K, V, KO> fromBiFunction(BiFunction<K, V, KO> biFunction) {
|
||||
return biFunction::apply;
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
|
||||
|
@ -47,7 +46,7 @@ import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Subscrip
|
|||
public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionSendProcessorSupplier.class);
|
||||
|
||||
private final Function<V, KO> foreignKeyExtractor;
|
||||
private final ForeignKeyExtractor<K, V, KO> foreignKeyExtractor;
|
||||
private final Supplier<String> foreignKeySerdeTopicSupplier;
|
||||
private final Supplier<String> valueSerdeTopicSupplier;
|
||||
private final boolean leftJoin;
|
||||
|
@ -55,7 +54,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
|
|||
private Serializer<V> valueSerializer;
|
||||
private boolean useVersionedSemantics;
|
||||
|
||||
public SubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
|
||||
public SubscriptionSendProcessorSupplier(final ForeignKeyExtractor<K, V, KO> foreignKeyExtractor,
|
||||
final Supplier<String> foreignKeySerdeTopicSupplier,
|
||||
final Supplier<String> valueSerdeTopicSupplier,
|
||||
final Serde<KO> foreignKeySerde,
|
||||
|
@ -129,27 +128,27 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
|
|||
|
||||
private void leftJoinInstructions(final Record<K, Change<V>> record) {
|
||||
if (record.value().oldValue != null) {
|
||||
final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
|
||||
final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue);
|
||||
final KO oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
|
||||
final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
|
||||
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
|
||||
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
|
||||
}
|
||||
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
|
||||
} else if (record.value().newValue != null) {
|
||||
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
|
||||
final KO newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
|
||||
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
|
||||
}
|
||||
}
|
||||
|
||||
private void defaultJoinInstructions(final Record<K, Change<V>> record) {
|
||||
if (record.value().oldValue != null) {
|
||||
final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue);
|
||||
final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
|
||||
if (oldForeignKey == null) {
|
||||
logSkippedRecordDueToNullForeignKey();
|
||||
return;
|
||||
}
|
||||
if (record.value().newValue != null) {
|
||||
final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue);
|
||||
final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
|
||||
if (newForeignKey == null) {
|
||||
logSkippedRecordDueToNullForeignKey();
|
||||
return;
|
||||
|
@ -167,7 +166,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSup
|
|||
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
|
||||
}
|
||||
} else if (record.value().newValue != null) {
|
||||
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
|
||||
final KO newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
|
||||
if (newForeignKey == null) {
|
||||
logSkippedRecordDueToNullForeignKey();
|
||||
} else {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.test.TestUtils;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
@ -233,6 +234,137 @@ public class KTableKTableForeignKeyJoinScenarioTest {
|
|||
)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWorkWithCompositeKeyAndProducerIdInValue() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
// Left table keyed by <producer_id, product_id>
|
||||
final KTable<String, String> leftTable = builder.table(
|
||||
"left_table",
|
||||
Consumed.with(Serdes.String(), Serdes.String())
|
||||
);
|
||||
|
||||
// Right table keyed by producer_id
|
||||
final KTable<String, String> rightTable = builder.table(
|
||||
"right_table",
|
||||
Consumed.with(Serdes.String(), Serdes.String())
|
||||
);
|
||||
|
||||
// Have to include producer_id in value since foreignKeyExtractor only gets value
|
||||
final KTable<String, String> joined = leftTable.join(
|
||||
rightTable,
|
||||
value -> value.split("\\|")[0], // extract producer_id from value
|
||||
(leftValue, rightValue) -> "(" + leftValue + "," + rightValue + ")",
|
||||
Materialized.as("store")
|
||||
);
|
||||
|
||||
joined.toStream().to("output");
|
||||
|
||||
try (final TopologyTestDriver driver = createTopologyTestDriver(builder)) {
|
||||
final TestInputTopic<String, String> leftInput = driver.createInputTopic(
|
||||
"left_table",
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
final TestInputTopic<String, String> rightInput = driver.createInputTopic(
|
||||
"right_table",
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
final TestOutputTopic<String, String> output = driver.createOutputTopic(
|
||||
"output",
|
||||
new StringDeserializer(),
|
||||
new StringDeserializer()
|
||||
);
|
||||
|
||||
// Key format: "producerId:productId"
|
||||
// Left value format: "producerId|productData"
|
||||
leftInput.pipeInput("producer1:product1", "producer1|product1-data");
|
||||
leftInput.pipeInput("producer1:product2", "producer1|product2-data");
|
||||
leftInput.pipeInput("producer2:product1", "producer2|product1-data");
|
||||
|
||||
rightInput.pipeInput("producer1", "producer1-data");
|
||||
rightInput.pipeInput("producer2", "producer2-data");
|
||||
|
||||
final Map<String, String> expectedOutput = new HashMap<>();
|
||||
expectedOutput.put("producer1:product1", "(producer1|product1-data,producer1-data)");
|
||||
expectedOutput.put("producer1:product2", "(producer1|product2-data,producer1-data)");
|
||||
expectedOutput.put("producer2:product1", "(producer2|product1-data,producer2-data)");
|
||||
|
||||
assertThat(output.readKeyValuesToMap(), is(expectedOutput));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWorkWithCompositeKeyAndBiFunctionExtractor() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
// Left table keyed by <producer_id, product_id>
|
||||
final KTable<String, String> leftTable = builder.table(
|
||||
"left_table",
|
||||
Consumed.with(Serdes.String(), Serdes.String())
|
||||
);
|
||||
|
||||
// Right table keyed by producer_id
|
||||
final KTable<String, String> rightTable = builder.table(
|
||||
"right_table",
|
||||
Consumed.with(Serdes.String(), Serdes.String())
|
||||
);
|
||||
|
||||
// Can extract producer_id from composite key using BiFunction
|
||||
final KTable<String, String> joined = leftTable.join(
|
||||
rightTable,
|
||||
(key, value) -> key.split(":")[0], // extract producer_id from key
|
||||
(leftValue, rightValue) -> "(" + leftValue + "," + rightValue + ")",
|
||||
Materialized.as("store")
|
||||
);
|
||||
|
||||
joined.toStream().to("output");
|
||||
|
||||
try (final TopologyTestDriver driver = createTopologyTestDriver(builder)) {
|
||||
final TestInputTopic<String, String> leftInput = driver.createInputTopic(
|
||||
"left_table",
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
final TestInputTopic<String, String> rightInput = driver.createInputTopic(
|
||||
"right_table",
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
final TestOutputTopic<String, String> output = driver.createOutputTopic(
|
||||
"output",
|
||||
new StringDeserializer(),
|
||||
new StringDeserializer()
|
||||
);
|
||||
|
||||
// Now we don't need producer_id in the value
|
||||
leftInput.pipeInput("producer1:product1", "product1-data");
|
||||
leftInput.pipeInput("producer1:product2", "product2-data");
|
||||
leftInput.pipeInput("producer2:product1", "product1-data");
|
||||
|
||||
rightInput.pipeInput("producer1", "producer1-data");
|
||||
rightInput.pipeInput("producer2", "producer2-data");
|
||||
|
||||
final Map<String, String> expectedOutput = new HashMap<>();
|
||||
expectedOutput.put("producer1:product1", "(product1-data,producer1-data)");
|
||||
expectedOutput.put("producer1:product2", "(product2-data,producer1-data)");
|
||||
expectedOutput.put("producer2:product1", "(product1-data,producer2-data)");
|
||||
|
||||
assertThat(output.readKeyValuesToMap(), is(expectedOutput));
|
||||
}
|
||||
}
|
||||
|
||||
private TopologyTestDriver createTopologyTestDriver(final StreamsBuilder builder) {
|
||||
final Properties config = new Properties();
|
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
|
||||
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
|
||||
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
|
||||
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
return new TopologyTestDriver(builder.build(), config);
|
||||
}
|
||||
|
||||
private void validateTopologyCanProcessData(final StreamsBuilder builder) {
|
||||
final Properties config = new Properties();
|
||||
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
|
||||
|
|
|
@ -45,7 +45,7 @@ public class SubscriptionSendProcessorSupplierTest {
|
|||
|
||||
private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> leftJoinProcessor =
|
||||
new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
|
||||
LeftValue::getForeignKey,
|
||||
ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey),
|
||||
() -> "subscription-topic-fk",
|
||||
() -> "value-serde-topic",
|
||||
Serdes.String(),
|
||||
|
@ -55,7 +55,7 @@ public class SubscriptionSendProcessorSupplierTest {
|
|||
|
||||
private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> innerJoinProcessor =
|
||||
new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
|
||||
LeftValue::getForeignKey,
|
||||
ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey),
|
||||
() -> "subscription-topic-fk",
|
||||
() -> "value-serde-topic",
|
||||
Serdes.String(),
|
||||
|
@ -327,6 +327,306 @@ public class SubscriptionSendProcessorSupplierTest {
|
|||
assertThat(context.forwarded(), empty());
|
||||
}
|
||||
|
||||
// Bi-function tests: inner join, left join
|
||||
private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> biFunctionLeftJoinProcessor =
|
||||
new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
|
||||
ForeignKeyExtractor.fromBiFunction((key, value) -> value.getForeignKey() == null ? null : key + value.getForeignKey()),
|
||||
() -> "subscription-topic-fk",
|
||||
() -> "value-serde-topic",
|
||||
Serdes.String(),
|
||||
new LeftValueSerializer(),
|
||||
true
|
||||
).get();
|
||||
|
||||
private final Processor<String, Change<LeftValue>, String, SubscriptionWrapper<String>> biFunctionInnerJoinProcessor =
|
||||
new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
|
||||
ForeignKeyExtractor.fromBiFunction((key, value) -> value.getForeignKey() == null ? null : key + value.getForeignKey()),
|
||||
() -> "subscription-topic-fk",
|
||||
() -> "value-serde-topic",
|
||||
Serdes.String(),
|
||||
new LeftValueSerializer(),
|
||||
false
|
||||
).get();
|
||||
|
||||
// Bi-function tests: left join
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNonNullFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk1);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, null), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNullFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(null);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, null), 0));
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(null, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNonNullValue() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk2);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0));
|
||||
|
||||
final String compositeKey = pk + fk2;
|
||||
|
||||
assertThat(context.forwarded().size(), is(2));
|
||||
assertThat(
|
||||
context.forwarded().get(1).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateNewRecordOfUnchangedFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk1);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNullValue() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(null);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), greaterThan(0));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateChangeFromNullFKToNonNullFKValue() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk1);
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(null)), 0));
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateChangeFromNullFKToNullFKValue() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(null);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(null, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKey() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(fk1)), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), greaterThan(0));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKeyThatHadNullFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(null)), 0));
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(null, new SubscriptionWrapper<>(null, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionLeftJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionLeftJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
biFunctionLeftJoinProcessor.process(new Record<>(pk, new Change<>(null, null), 0));
|
||||
|
||||
assertThat(context.forwarded(), empty());
|
||||
}
|
||||
|
||||
// Bi-function tests: inner join
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldPropagateNewPrimaryKey() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk1);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, null), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldNotPropagateNewPrimaryKeyWithNullFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(null);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, null), 0));
|
||||
|
||||
assertThat(context.forwarded(), empty());
|
||||
|
||||
// test dropped-records sensors
|
||||
assertEquals(1.0, getDroppedRecordsTotalMetric(context));
|
||||
assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldDeleteOldAndPropagateNewFK() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(fk2);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0));
|
||||
|
||||
final String compositeKey1 = pk + fk1;
|
||||
final String compositeKey2 = pk + fk2;
|
||||
|
||||
assertThat(context.forwarded().size(), is(2));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
|
||||
);
|
||||
assertThat(
|
||||
context.forwarded().get(1).record(),
|
||||
is(new Record<>(compositeKey2, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
final LeftValue leftRecordValue = new LeftValue(null);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
|
||||
|
||||
assertThat(context.forwarded(), empty());
|
||||
|
||||
// test dropped-records sensors
|
||||
assertEquals(1.0, getDroppedRecordsTotalMetric(context));
|
||||
assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldPropagateDeletionOfPrimaryKey() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(fk1)), 0));
|
||||
|
||||
final String compositeKey = pk + fk1;
|
||||
|
||||
assertThat(context.forwarded().size(), is(1));
|
||||
assertThat(
|
||||
context.forwarded().get(0).record(),
|
||||
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
|
||||
final MockInternalNewProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
|
||||
biFunctionInnerJoinProcessor.init(context);
|
||||
context.setRecordMetadata("topic", 0, 0);
|
||||
|
||||
biFunctionInnerJoinProcessor.process(new Record<>(pk, new Change<>(null, null), 0));
|
||||
|
||||
assertThat(context.forwarded(), empty());
|
||||
}
|
||||
|
||||
private static class LeftValueSerializer implements Serializer<LeftValue> {
|
||||
@Override
|
||||
public byte[] serialize(final String topic, final LeftValue data) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.kafka.streams.scala
|
||||
package kstream
|
||||
|
||||
import scala.jdk.FunctionWrappers.AsJavaBiFunction
|
||||
import org.apache.kafka.common.utils.Bytes
|
||||
import org.apache.kafka.streams.kstream.{KTable => KTableJ, TableJoined, ValueJoiner, ValueTransformerWithKeySupplier}
|
||||
import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
|
||||
|
@ -643,6 +644,26 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
): KTable[K, VR] =
|
||||
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
*
|
||||
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
|
||||
* @param keyExtractor a function that extracts the foreign key from this table's key and value
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
*/
|
||||
def join[VR, KO, VO](
|
||||
other: KTable[KO, VO],
|
||||
keyExtractor: (K, V) => KO,
|
||||
joiner: ValueJoiner[V, VO, VR],
|
||||
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
|
||||
): KTable[K, VR] =
|
||||
new KTable(inner.join(other.inner, AsJavaBiFunction[K, V, KO](keyExtractor), joiner, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
|
@ -666,6 +687,29 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
): KTable[K, VR] =
|
||||
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
*
|
||||
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
|
||||
* @param keyExtractor a function that extracts the foreign key from this table's key and value
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param tableJoined a `org.apache.kafka.streams.kstream.TableJoined` used to configure
|
||||
* partitioners and names of internal topics and stores
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
*/
|
||||
def join[VR, KO, VO](
|
||||
other: KTable[KO, VO],
|
||||
keyExtractor: (K, V) => KO,
|
||||
joiner: ValueJoiner[V, VO, VR],
|
||||
tableJoined: TableJoined[K, KO],
|
||||
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
|
||||
): KTable[K, VR] =
|
||||
new KTable(inner.join(other.inner, AsJavaBiFunction[K, V, KO](keyExtractor), joiner, tableJoined, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
|
@ -686,6 +730,26 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
): KTable[K, VR] =
|
||||
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
*
|
||||
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
|
||||
* @param keyExtractor a function that extracts the foreign key from this table's key and value
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
*/
|
||||
def leftJoin[VR, KO, VO](
|
||||
other: KTable[KO, VO],
|
||||
keyExtractor: (K, V) => KO,
|
||||
joiner: ValueJoiner[V, VO, VR],
|
||||
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
|
||||
): KTable[K, VR] =
|
||||
new KTable(inner.leftJoin(other.inner, AsJavaBiFunction[K, V, KO](keyExtractor), joiner, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
|
@ -709,6 +773,29 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
): KTable[K, VR] =
|
||||
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
|
||||
|
||||
/**
|
||||
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
|
||||
* table are joined according to the result of keyExtractor on the other KTable.
|
||||
*
|
||||
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
|
||||
* @param keyExtractor a function that extracts the foreign key from this table's key and value
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param tableJoined a `org.apache.kafka.streams.kstream.TableJoined` used to configure
|
||||
* partitioners and names of internal topics and stores
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
*/
|
||||
def leftJoin[VR, KO, VO](
|
||||
other: KTable[KO, VO],
|
||||
keyExtractor: (K, V) => KO,
|
||||
joiner: ValueJoiner[V, VO, VR],
|
||||
tableJoined: TableJoined[K, KO],
|
||||
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
|
||||
): KTable[K, VR] =
|
||||
new KTable(inner.leftJoin(other.inner, AsJavaBiFunction[K, V, KO](keyExtractor), joiner, tableJoined, materialized))
|
||||
|
||||
/**
|
||||
* Get the name of the local state store used that can be used to query this [[KTable]].
|
||||
*
|
||||
|
|
|
@ -534,4 +534,84 @@ class KTableTest extends TestDriver {
|
|||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testJoinWithBiFunctionKeyExtractor(): Unit = {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic1 = "source1"
|
||||
val sourceTopic2 = "source2"
|
||||
val sinkTopic = "sink"
|
||||
|
||||
val table1 = builder.stream[String, String](sourceTopic1).toTable
|
||||
val table2 = builder.stream[String, String](sourceTopic2).toTable
|
||||
|
||||
table1
|
||||
.join[String, String, String](
|
||||
table2,
|
||||
(key: String, value: String) => s"$key-$value",
|
||||
joiner = (v1: String, v2: String) => s"$v1+$v2",
|
||||
materialized = Materialized.`with`[String, String, ByteArrayKeyValueStore]
|
||||
)
|
||||
.toStream
|
||||
.to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
val testInput1 = testDriver.createInput[String, String](sourceTopic1)
|
||||
val testInput2 = testDriver.createInput[String, String](sourceTopic2)
|
||||
val testOutput = testDriver.createOutput[String, String](sinkTopic)
|
||||
|
||||
testInput1.pipeInput("k1", "v1")
|
||||
testInput2.pipeInput("k1-v1", "v2")
|
||||
|
||||
val record = testOutput.readKeyValue
|
||||
assertEquals("k1", record.key)
|
||||
assertEquals("v1+v2", record.value)
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLeftJoinWithBiFunctionKeyExtractor(): Unit = {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic1 = "source1"
|
||||
val sourceTopic2 = "source2"
|
||||
val sinkTopic = "sink"
|
||||
|
||||
val table1 = builder.stream[String, String](sourceTopic1).toTable
|
||||
val table2 = builder.stream[String, String](sourceTopic2).toTable
|
||||
|
||||
table1
|
||||
.leftJoin[String, String, String](
|
||||
table2,
|
||||
(key: String, value: String) => s"$key-$value",
|
||||
joiner = (v1: String, v2: String) => s"${v1}+${Option(v2).getOrElse("null")}",
|
||||
materialized = Materialized.`with`[String, String, ByteArrayKeyValueStore]
|
||||
)
|
||||
.toStream
|
||||
.to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
val testInput1 = testDriver.createInput[String, String](sourceTopic1)
|
||||
val testInput2 = testDriver.createInput[String, String](sourceTopic2)
|
||||
val testOutput = testDriver.createOutput[String, String](sinkTopic)
|
||||
|
||||
// First insert into the foreign key table (table2)
|
||||
testInput2.pipeInput("k1-v1", "v2")
|
||||
|
||||
// Then insert into the primary table (table1)
|
||||
testInput1.pipeInput("k1", "v1")
|
||||
|
||||
val record1 = testOutput.readKeyValue
|
||||
assertEquals("k1", record1.key)
|
||||
assertEquals("v1+v2", record1.value)
|
||||
|
||||
// Test with non-matching foreign key (should still output due to left join)
|
||||
testInput1.pipeInput("k2", "v3")
|
||||
|
||||
val record2 = testOutput.readKeyValue
|
||||
assertEquals("k2", record2.key)
|
||||
assertEquals("v3+null", record2.value)
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue