KAFKA-3705 Added a foreignKeyJoin implementation for KTable. (#5527)

https://issues.apache.org/jira/browse/KAFKA-3705

Allows for a KTable to map its value to a given foreign key and join on another KTable keyed on that foreign key. Applies the joiner, then returns the tuples keyed on the original key. This supports updates from both sides of the join.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>, Christopher Pettitt <cpettitt@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Jan Filipiak <Jan.Filipiak@trivago.com>, pgwhalen, Alexei Daniline
This commit is contained in:
Adam Bellemare 2019-10-03 18:59:31 -04:00 committed by Bill Bejeck
parent ded1fb8c4d
commit c87fe9402c
39 changed files with 3141 additions and 32 deletions

View File

@ -1224,6 +1224,12 @@ project(':streams') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream()
}
test {
// The suites are for running sets of tests in IDEs.
// Gradle will run each test class, so we exclude the suites to avoid redundantly running the tests twice.
exclude '**/*Suite.class'
}
}
project(':streams:streams-scala') {

View File

@ -1,4 +1,4 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
@ -67,6 +67,8 @@
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
<suppress checks="(UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|MethodLength|JavaNCSS)"
files="clients[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
@ -90,6 +92,9 @@
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>
<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
files="DistributedHerder(|Test).java"/>
@ -149,7 +154,7 @@
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
<suppress checks="MethodLength"
files="StreamsPartitionAssignor.java"/>
files="(KTableImpl|StreamsPartitionAssignor.java)"/>
<suppress checks="ParameterNumber"
files="StreamTask.java"/>
@ -229,7 +234,8 @@
<suppress checks="CyclomaticComplexity"
files="(StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
files="StreamsResetter.java"/>
<suppress checks="NPathComplexity"
files="(ProducerPerformance|StreamsResetter|Agent).java"/>
<suppress checks="ImportControl"

View File

@ -141,7 +141,33 @@ public class Bytes implements Comparable<Bytes> {
}
/**
* A byte array comparator based on lexicographic ordering.
* Increment the underlying byte array by adding 1. Throws an IndexOutOfBoundsException if incrementing would cause
* the underlying input byte array to overflow.
*
* @param input - The byte array to increment
* @return A new copy of the incremented byte array.
*/
public static Bytes increment(Bytes input) throws IndexOutOfBoundsException {
byte[] inputArr = input.get();
byte[] ret = new byte[inputArr.length];
int carry = 1;
for (int i = inputArr.length - 1; i >= 0; i--) {
if (inputArr[i] == (byte) 0xFF && carry == 1) {
ret[i] = (byte) 0x00;
} else {
ret[i] = (byte) (inputArr[i] + carry);
carry = 0;
}
}
if (carry == 0) {
return wrap(ret);
} else {
throw new IndexOutOfBoundsException();
}
}
/**
* A byte array comparator based on lexicograpic ordering.
*/
public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import org.junit.Test;
import java.util.Comparator;
import java.util.NavigableMap;
import java.util.TreeMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertEquals;
public class BytesTest {
@Test
public void testIncrement() {
byte[] input = new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xFF};
byte[] expected = new byte[]{(byte) 0xAB, (byte) 0xCE, (byte) 0x00};
Bytes output = Bytes.increment(Bytes.wrap(input));
assertArrayEquals(output.get(), expected);
}
@Test
public void testIncrementUpperBoundary() {
byte[] input = new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
assertThrows(IndexOutOfBoundsException.class, () -> Bytes.increment(Bytes.wrap(input)));
}
@Test
public void testIncrementWithSubmap() {
final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
Bytes key1 = Bytes.wrap(new byte[]{(byte) 0xAA});
byte[] val = new byte[]{(byte) 0x00};
map.put(key1, val);
Bytes key2 = Bytes.wrap(new byte[]{(byte) 0xAA, (byte) 0xAA});
map.put(key2, val);
Bytes key3 = Bytes.wrap(new byte[]{(byte) 0xAA, (byte) 0x00, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
map.put(key3, val);
Bytes key4 = Bytes.wrap(new byte[]{(byte) 0xAB, (byte) 0x00});
map.put(key4, val);
Bytes key5 = Bytes.wrap(new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01});
map.put(key5, val);
Bytes prefix = key1;
Bytes prefixEnd = Bytes.increment(prefix);
Comparator<? super Bytes> comparator = map.comparator();
final int result = comparator == null ? prefix.compareTo(prefixEnd) : comparator.compare(prefix, prefixEnd);
NavigableMap<Bytes, byte[]> subMapResults;
if (result > 0) {
//Prefix increment would cause a wrap-around. Get the submap from toKey to the end of the map
subMapResults = map.tailMap(prefix, true);
} else {
subMapResults = map.subMap(prefix, true, prefixEnd, false);
}
NavigableMap<Bytes, byte[]> subMapExpected = new TreeMap<>();
subMapExpected.put(key1, val);
subMapExpected.put(key2, val);
subMapExpected.put(key3, val);
assertEquals(subMapExpected.keySet(), subMapResults.keySet());
}
}

View File

@ -675,7 +675,9 @@ public class StreamsResetter {
// Cf. https://issues.apache.org/jira/browse/KAFKA-7930
return !isInputTopic(topicName) && !isIntermediateTopic(topicName)
&& topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
|| topicName.endsWith("-subscription-registration-topic")
|| topicName.endsWith("-subscription-response-topic"));
}
public static void main(final String[] args) {

View File

@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.function.Function;
/**
* {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
@ -2117,6 +2119,92 @@ public interface KTable<K, V> {
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
*
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V)
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
*
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V)
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
*
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* * resultant foreignKey is null, the record will not propagate to the output.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
*
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
* resultant foreignKey is null, the record will not propagate to the output.
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
* @return a {@code KTable} that contains only those records that satisfy the given predicate
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
*

View File

@ -17,8 +17,10 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
@ -27,15 +29,29 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerde;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@ -43,17 +59,22 @@ import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcess
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
@ -89,6 +110,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
private static final String FK_JOIN_STATE_STORE_NAME = "KTABLE-INTERNAL-SUBSCRIPTION-STATE-STORE-";
private static final String SUBSCRIPTION_REGISTRATION = "KTABLE-SUBSCRIPTION-REGISTRATION-";
private static final String SUBSCRIPTION_RESPONSE = "KTABLE-SUBSCRIPTION-RESPONSE-";
private static final String SUBSCRIPTION_PROCESSOR = "KTABLE-SUBSCRIPTION-PROCESSOR-";
private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = "KTABLE-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
private static final String FK_JOIN_OUTPUT_PROCESSOR = "KTABLE-OUTPUT-PROCESSOR-";
private static final String TOPIC_SUFFIX = "-topic";
private static final String SINK_NAME = "KTABLE-SINK-";
private final ProcessorSupplier<?, ?> processorSupplier;
private final String queryableStoreName;
@ -803,4 +833,224 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return (ProcessorParameters<K, VR>) kObjectProcessorParameters;
}
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized), false);
}
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), new MaterializedInternal<>(materialized), false);
}
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized), true);
}
@Override
public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), new MaterializedInternal<>(materialized), true);
}
@SuppressWarnings("unchecked")
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named joinName,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final boolean leftJoin) {
Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joinName, "joinName can't be null");
Objects.requireNonNull(materializedInternal, "materialized can't be null");
//Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
//such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
//not be done needlessly.
((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues();
//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues();
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valSerde);
final NamedInternal renamed = new NamedInternal(joinName);
final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(subscriptionTopicName, foreignKeySerde, keySerde);
final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionSendProcessorSupplier<>(
foreignKeyExtractor,
foreignKeySerde,
subscriptionTopicName,
valSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
)
);
builder.addGraphNode(streamsGraphNode, subscriptionNode);
final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(subscriptionTopicName),
new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
);
builder.addGraphNode(subscriptionNode, subscriptionSink);
final StreamSourceNode<KO, SubscriptionWrapper<K>> subscriptionSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
Collections.singleton(subscriptionTopicName),
new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
);
builder.addGraphNode(subscriptionSink, subscriptionSource);
// The subscription source is the source node on the *receiving* end *after* the repartition.
// This topic needs to be copartitioned with the Foreign Key table.
final Set<String> copartitionedRepartitionSources =
new HashSet<>(((KTableImpl<?, ?, ?>) foreignKeyTable).sourceNodes);
copartitionedRepartitionSources.add(subscriptionSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> subscriptionStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(
renamed.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME)
),
new Serdes.BytesSerde(),
subscriptionWrapperSerde
);
builder.addStateStore(subscriptionStore);
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionStoreReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.emptySet()
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionJoinForeignProcessorSupplier<>(
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()
),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier())
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.emptySet()
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).streamsGraphNode, foreignJoinSubscriptionNode);
final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName);
final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
new StreamSinkNode<>(
renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
new StaticTopicNameExtractor<>(finalRepartitionTopicName),
new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
);
builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);
final StreamSourceNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSource = new StreamSourceNode<>(
renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
Collections.singleton(finalRepartitionTopicName),
new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
);
builder.addGraphNode(foreignResponseSink, foreignResponseSource);
// the response topic has to be copartitioned with the left (primary) side of the join
final Set<String> resultSourceNodes = new HashSet<>(this.sourceNodes);
resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> resolverNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valueSerde().serializer(),
joiner,
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(foreignResponseSource, resolverNode);
final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_PROCESSOR);
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal.storeName(), materializedInternal.queryableStoreName());
final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
materializedInternal.queryableStoreName() == null
? null
: new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,
resultProcessorName
),
resultStore
);
builder.addGraphNode(resolverNode, resultNode);
return new KTableImpl<K, V, VR>(
resultProcessorName,
keySerde,
materializedInternal.valueSerde(),
resultSourceNodes,
materializedInternal.storeName(),
resultProcessorSupplier,
resultNode,
builder
);
}
}

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -23,7 +24,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
private final String storeName;
KTableSourceValueGetterSupplier(final String storeName) {
public KTableSourceValueGetterSupplier(final String storeName) {
this.storeName = storeName;
}
@ -49,6 +50,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
}
@Override
public void close() {}
public void close() {
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import java.util.Objects;
public class CombinedKey<KF, KP> {
private final KF foreignKey;
private final KP primaryKey;
CombinedKey(final KF foreignKey, final KP primaryKey) {
Objects.requireNonNull(foreignKey, "foreignKey can't be null");
Objects.requireNonNull(primaryKey, "primaryKey can't be null");
this.foreignKey = foreignKey;
this.primaryKey = primaryKey;
}
public KF getForeignKey() {
return foreignKey;
}
public KP getPrimaryKey() {
return primaryKey;
}
public boolean equals(final KF foreignKey, final KP primaryKey) {
if (this.primaryKey == null) {
return false;
}
return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
}
@Override
public String toString() {
return "CombinedKey{" +
"foreignKey=" + foreignKey +
", primaryKey=" + primaryKey +
'}';
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.nio.ByteBuffer;
/**
* Factory for creating CombinedKey serializers / deserializers.
*/
public class CombinedKeySchema<KO, K> {
private final String serdeTopic;
private Serializer<K> primaryKeySerializer;
private Deserializer<K> primaryKeyDeserializer;
private Serializer<KO> foreignKeySerializer;
private Deserializer<KO> foreignKeyDeserializer;
public CombinedKeySchema(final String serdeTopic, final Serde<KO> foreignKeySerde, final Serde<K> primaryKeySerde) {
this.serdeTopic = serdeTopic;
primaryKeySerializer = primaryKeySerde.serializer();
primaryKeyDeserializer = primaryKeySerde.deserializer();
foreignKeyDeserializer = foreignKeySerde.deserializer();
foreignKeySerializer = foreignKeySerde.serializer();
}
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
}
Bytes toBytes(final KO foreignKey, final K primaryKey) {
//The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan
//key is being created.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, foreignKey);
//? bytes
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(serdeTopic, primaryKey);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length + primaryKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
buf.put(foreignKeySerializedData);
buf.put(primaryKeySerializedData);
return Bytes.wrap(buf.array());
}
public CombinedKey<KO, K> fromBytes(final Bytes data) {
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] dataArray = data.get();
final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
final int foreignKeyLength = dataBuffer.getInt();
final byte[] foreignKeyRaw = new byte[foreignKeyLength];
dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
final KO foreignKey = foreignKeyDeserializer.deserialize(serdeTopic, foreignKeyRaw);
final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES];
dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
final K primaryKey = primaryKeyDeserializer.deserialize(serdeTopic, primaryKeyRaw);
return new CombinedKey<>(foreignKey, primaryKey);
}
Bytes prefixBytes(final KO key) {
//The serialization format. Note that primaryKeySerialized is not required/used in this function.
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, key);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
buf.put(foreignKeySerializedData);
return Bytes.wrap(buf.array());
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements ProcessorSupplier<KO, Change<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;
public ForeignJoinSubscriptionProcessorSupplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
final CombinedKeySchema<KO, K> keySchema) {
this.storeBuilder = storeBuilder;
this.keySchema = keySchema;
}
@Override
public Processor<KO, Change<VO>> get() {
return new KTableKTableJoinProcessor();
}
private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Change<VO>> {
private Sensor skippedRecordsSensor;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
@Override
public void init(final ProcessorContext context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(internalProcessorContext.metrics());
store = internalProcessorContext.getStateStore(storeBuilder);
}
/**
* @throws StreamsException if key is null
*/
@Override
public void process(final KO key, final Change<VO> value) {
// if the key is null, we do not need proceed aggregating
// the record with the table
if (key == null) {
LOG.warn(
"Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
value, context().topic(), context().partition(), context().offset()
);
skippedRecordsSensor.record();
return;
}
final Bytes prefixBytes = keySchema.prefixBytes(key);
//Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
store.range(prefixBytes, Bytes.increment(prefixBytes))) {
while (prefixScanResults.hasNext()) {
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
// have to check the prefix because the range end is inclusive :(
if (prefixEquals(next.key.get(), prefixBytes.get())) {
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(next.value.value().getHash(), value.newValue)
);
}
}
}
}
private boolean prefixEquals(final byte[] x, final byte[] y) {
final int min = Math.min(x.length, y.length);
final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
return xSlice.equals(ySlice);
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.internals.Murmur3;
import java.util.function.Function;
import java.util.Arrays;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>> {
private final Function<V, KO> foreignKeyExtractor;
private final String repartitionTopicName;
private final Serializer<V> valueSerializer;
private final boolean leftJoin;
private Serializer<KO> foreignKeySerializer;
public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
final Serde<KO> foreignKeySerde,
final String repartitionTopicName,
final Serializer<V> valueSerializer,
final boolean leftJoin) {
this.foreignKeyExtractor = foreignKeyExtractor;
this.valueSerializer = valueSerializer;
this.leftJoin = leftJoin;
this.repartitionTopicName = repartitionTopicName;
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
}
@Override
public Processor<K, Change<V>> get() {
return new UnbindChangeProcessor();
}
private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
// get default key serde if it wasn't supplied directly at construction
if (foreignKeySerializer == null) {
foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
}
}
@Override
public void process(final K key, final Change<V> change) {
final long[] currentHash = change.newValue == null ?
null :
Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue));
if (change.oldValue != null) {
final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue);
if (change.newValue != null) {
final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
final byte[] serialOldForeignKey = foreignKeySerializer.serialize(repartitionTopicName, oldForeignKey);
final byte[] serialNewForeignKey = foreignKeySerializer.serialize(repartitionTopicName, newForeignKey);
if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, key));
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
}
context().forward(newForeignKey, new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, key));
} else {
//A simple propagatable delete. Delete from the state store and propagate the delete onwards.
context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, key));
}
} else if (change.newValue != null) {
//change.oldValue is null, which means it was deleted at least once before, or it is brand new.
//In either case, we only need to propagate if the FK_VAL is available, as the null from the delete would
//have been propagated otherwise.
final SubscriptionWrapper.Instruction instruction;
if (leftJoin) {
//Want to send info even if RHS is null.
instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
} else {
instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
}
context().forward(foreignKeyExtractor.apply(change.newValue), new SubscriptionWrapper<>(currentHash, instruction, key));
}
}
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
/**
* Receives {@code SubscriptionWrapper<K>} events and processes them according to their Instruction.
* Depending on the results, {@code SubscriptionResponseWrapper}s are created, which will be propagated to
* the {@code SubscriptionResolverJoinProcessorSupplier} instance.
*
* @param <K> Type of primary keys
* @param <KO> Type of foreign key
* @param <VO> Type of foreign value
*/
public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;
public SubscriptionJoinForeignProcessorSupplier(final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier) {
this.foreignValueGetterSupplier = foreignValueGetterSupplier;
}
@Override
public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
return new AbstractProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
private KTableValueGetter<KO, VO> foreignValues;
@Override
public void init(final ProcessorContext context) {
super.init(context);
foreignValues = foreignValueGetterSupplier.get();
foreignValues.init(context);
}
@Override
public void process(final CombinedKey<KO, K> combinedKey, final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change) {
Objects.requireNonNull(combinedKey, "This processor should never see a null key.");
Objects.requireNonNull(change, "This processor should never see a null value.");
final ValueAndTimestamp<SubscriptionWrapper<K>> valueAndTimestamp = change.newValue;
Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
final SubscriptionWrapper<K> value = valueAndTimestamp.value();
if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
final ValueAndTimestamp<VO> foreignValueAndTime = foreignValues.get(combinedKey.getForeignKey());
final long resultTimestamp =
foreignValueAndTime == null ?
valueAndTimestamp.timestamp() :
Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
switch (value.getInstruction()) {
case DELETE_KEY_AND_PROPAGATE:
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<VO>(value.getHash(), null),
To.all().withTimestamp(resultTimestamp)
);
break;
case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE:
//This one needs to go through regardless of LEFT or INNER join, since the extracted FK was
//changed and there is no match for it. We must propagate the (key, null) to ensure that the
//downstream consumers are alerted to this fact.
final VO valueToSend = foreignValueAndTime == null ? null : foreignValueAndTime.value();
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(value.getHash(), valueToSend),
To.all().withTimestamp(resultTimestamp)
);
break;
case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
if (foreignValueAndTime != null) {
context().forward(
combinedKey.getPrimaryKey(),
new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()),
To.all().withTimestamp(resultTimestamp)
);
}
break;
case DELETE_KEY_NO_PROPAGATE:
break;
default:
throw new IllegalStateException("Unhandled instruction: " + value.getInstruction());
}
}
};
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
/**
* Receives {@code SubscriptionResponseWrapper<VO>} events and filters out events which do not match the current hash
* of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
* Applies the join and emits nulls according to LEFT/INNER rules.
*
* @param <K> Type of primary keys
* @param <V> Type of primary values
* @param <VO> Type of foreign values
* @param <VR> Type of joined result of primary and foreign values
*/
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> valueSerializer;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;
public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
this.valueSerializer = valueSerializer;
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@Override
public Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
private KTableValueGetter<K, V> valueGetter;
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueGetter = valueGetterSupplier.get();
valueGetter.init(context);
}
@Override
public void process(final K key, final SubscriptionResponseWrapper<VO> value) {
if (value.getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
//compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
//upgrading from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version.");
}
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(key);
//We are unable to access the actual source topic name for the valueSerializer at runtime, without
//tightly coupling to KTableRepartitionProcessorSupplier.
//While we can use the source topic from where the events came from, we shouldn't serialize against it
//as it causes problems with the confluent schema registry, which requires each topic have only a single
//registered schema.
final String dummySerializationTopic = context().topic() + "-join-resolver";
final long[] currentHash = currentValueWithTimestamp == null ?
null :
Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
final long[] messageHash = value.getOriginalValueHash();
//If this value doesn't match the current value from the original table, it is stale and should be discarded.
if (java.util.Arrays.equals(messageHash, currentHash)) {
final VR result;
if (value.getForeignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
result = null; //Emit tombstone
} else {
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), value.getForeignValue());
}
context().forward(key, result);
}
}
};
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import java.util.Arrays;
public class SubscriptionResponseWrapper<FV> {
final static byte CURRENT_VERSION = 0x00;
private final long[] originalValueHash;
private final FV foreignValue;
private final byte version;
public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue) {
this(originalValueHash, foreignValue, CURRENT_VERSION);
}
public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final byte version) {
if (version != CURRENT_VERSION) {
throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
}
this.originalValueHash = originalValueHash;
this.foreignValue = foreignValue;
this.version = version;
}
public long[] getOriginalValueHash() {
return originalValueHash;
}
public FV getForeignValue() {
return foreignValue;
}
public byte getVersion() {
return version;
}
@Override
public String toString() {
return "SubscriptionResponseWrapper{" +
"version=" + version +
", foreignValue=" + foreignValue +
", originalValueHash=" + Arrays.toString(originalValueHash) +
'}';
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionResponseWrapper<V>> {
private final SubscriptionResponseWrapperSerializer<V> serializer;
private final SubscriptionResponseWrapperDeserializer<V> deserializer;
public SubscriptionResponseWrapperSerde(final Serde<V> foreignValueSerde) {
serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer());
deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer());
}
@Override
public Serializer<SubscriptionResponseWrapper<V>> serializer() {
return serializer;
}
@Override
public Deserializer<SubscriptionResponseWrapper<V>> deserializer() {
return deserializer;
}
private static final class SubscriptionResponseWrapperSerializer<V> implements Serializer<SubscriptionResponseWrapper<V>> {
private final Serializer<V> serializer;
private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
this.serializer = serializer;
}
@Override
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
//7-bit (0x7F) maximum for data version.
if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
}
final byte[] serializedData = serializer.serialize(topic, data.getForeignValue());
final int serializedDataLength = serializedData == null ? 0 : serializedData.length;
final long[] originalHash = data.getOriginalValueHash();
final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
final ByteBuffer buf = ByteBuffer.allocate(1 + hashLength + serializedDataLength);
if (originalHash != null) {
buf.put(data.getVersion());
buf.putLong(originalHash[0]);
buf.putLong(originalHash[1]);
} else {
//Don't store hash as it's null.
buf.put((byte) (data.getVersion() | (byte) 0x80));
}
if (serializedData != null)
buf.put(serializedData);
return buf.array();
}
}
private static final class SubscriptionResponseWrapperDeserializer<V> implements Deserializer<SubscriptionResponseWrapper<V>> {
private final Deserializer<V> deserializer;
private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializer) {
this.deserializer = deserializer;
}
@Override
public SubscriptionResponseWrapper<V> deserialize(final String topic, final byte[] data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
final ByteBuffer buf = ByteBuffer.wrap(data);
final byte versionAndIsHashNull = buf.get();
final byte version = (byte) (0x7F & versionAndIsHashNull);
final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
final long[] hash;
int lengthSum = 1; //The first byte
if (isHashNull) {
hash = null;
} else {
hash = new long[2];
hash[0] = buf.getLong();
hash[1] = buf.getLong();
lengthSum += 2 * Long.BYTES;
}
final byte[] serializedValue;
if (data.length - lengthSum > 0) {
serializedValue = new byte[data.length - lengthSum];
buf.get(serializedValue, 0, serializedValue.length);
} else
serializedValue = null;
final V value = deserializer.deserialize(topic, serializedValue);
return new SubscriptionResponseWrapper<>(hash, value, version);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;
public SubscriptionStoreReceiveProcessorSupplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
final CombinedKeySchema<KO, K> keySchema) {
this.storeBuilder = storeBuilder;
this.keySchema = keySchema;
}
@Override
public Processor<KO, SubscriptionWrapper<K>> get() {
return new AbstractProcessor<KO, SubscriptionWrapper<K>>() {
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
private StreamsMetricsImpl metrics;
private Sensor skippedRecordsSensor;
@Override
public void init(final ProcessorContext context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
metrics = internalProcessorContext.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
store = internalProcessorContext.getStateStore(storeBuilder);
}
@Override
public void process(final KO key, final SubscriptionWrapper<K> value) {
if (key == null) {
LOG.warn(
"Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
value, context().topic(), context().partition(), context().offset()
);
skippedRecordsSensor.record();
return;
}
if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
//Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
//with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
//from older SubscriptionWrapper versions to newer versions.
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
final Bytes subscriptionKey = keySchema.toBytes(key, value.getPrimaryKey());
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue = ValueAndTimestamp.make(value, context().timestamp());
final ValueAndTimestamp<SubscriptionWrapper<K>> oldValue = store.get(subscriptionKey);
//If the subscriptionWrapper hash indicates a null, must delete from statestore.
//This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
if (value.getHash() == null) {
store.delete(subscriptionKey);
} else {
store.put(subscriptionKey, newValue);
}
final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change = new Change<>(newValue, oldValue);
// note: key is non-nullable
// note: newValue is non-nullable
context().forward(
new CombinedKey<>(key, value.getPrimaryKey()),
change,
To.all().withTimestamp(newValue.timestamp())
);
}
};
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import java.util.Arrays;
import java.util.Objects;
public class SubscriptionWrapper<K> {
static final byte CURRENT_VERSION = 0;
private final long[] hash;
private final Instruction instruction;
private final byte version;
private final K primaryKey;
public enum Instruction {
//Send nothing. Do not propagate.
DELETE_KEY_NO_PROPAGATE((byte) 0x00),
//Send (k, null)
DELETE_KEY_AND_PROPAGATE((byte) 0x01),
//(changing foreign key, but FK+Val may not exist)
//Send (k, fk-val) OR
//Send (k, null) if fk-val does not exist
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE((byte) 0x02),
//(first time ever sending key)
//Send (k, fk-val) only if fk-val exists.
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE((byte) 0x03);
private final byte value;
Instruction(final byte value) {
this.value = value;
}
public byte getValue() {
return value;
}
public static Instruction fromValue(final byte value) {
for (final Instruction i: values()) {
if (i.value == value) {
return i;
}
}
throw new IllegalArgumentException("Unknown instruction byte value = " + value);
}
}
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey) {
this(hash, instruction, primaryKey, CURRENT_VERSION);
}
public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version) {
Objects.requireNonNull(instruction, "instruction cannot be null. Required by downstream processor.");
Objects.requireNonNull(primaryKey, "primaryKey cannot be null. Required by downstream processor.");
if (version != CURRENT_VERSION) {
throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
}
this.instruction = instruction;
this.hash = hash;
this.primaryKey = primaryKey;
this.version = version;
}
public Instruction getInstruction() {
return instruction;
}
public long[] getHash() {
return hash;
}
public K getPrimaryKey() {
return primaryKey;
}
public byte getVersion() {
return version;
}
@Override
public String toString() {
return "SubscriptionWrapper{" +
"version=" + version +
", primaryKey=" + primaryKey +
", instruction=" + instruction +
", hash=" + Arrays.toString(hash) +
'}';
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
private final SubscriptionWrapperSerializer<K> serializer;
private final SubscriptionWrapperDeserializer<K> deserializer;
public SubscriptionWrapperSerde(final Serde<K> primaryKeySerde) {
serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer());
deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer());
}
@Override
public Serializer<SubscriptionWrapper<K>> serializer() {
return serializer;
}
@Override
public Deserializer<SubscriptionWrapper<K>> deserializer() {
return deserializer;
}
private static class SubscriptionWrapperSerializer<K> implements Serializer<SubscriptionWrapper<K>> {
private final Serializer<K> primaryKeySerializer;
SubscriptionWrapperSerializer(final Serializer<K> primaryKeySerializer) {
this.primaryKeySerializer = primaryKeySerializer;
}
@Override
public byte[] serialize(final String topic, final SubscriptionWrapper<K> data) {
//{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
//7-bit (0x7F) maximum for data version.
if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
}
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(topic, data.getPrimaryKey());
final ByteBuffer buf;
if (data.getHash() != null) {
buf = ByteBuffer.allocate(2 + 2 * Long.BYTES + primaryKeySerializedData.length);
buf.put(data.getVersion());
} else {
//Don't store hash as it's null.
buf = ByteBuffer.allocate(2 + primaryKeySerializedData.length);
buf.put((byte) (data.getVersion() | (byte) 0x80));
}
buf.put(data.getInstruction().getValue());
final long[] elem = data.getHash();
if (data.getHash() != null) {
buf.putLong(elem[0]);
buf.putLong(elem[1]);
}
buf.put(primaryKeySerializedData);
return buf.array();
}
}
private static class SubscriptionWrapperDeserializer<K> implements Deserializer<SubscriptionWrapper<K>> {
private final Deserializer<K> primaryKeyDeserializer;
SubscriptionWrapperDeserializer(final Deserializer<K> primaryKeyDeserializer) {
this.primaryKeyDeserializer = primaryKeyDeserializer;
}
@Override
public SubscriptionWrapper<K> deserialize(final String topic, final byte[] data) {
//{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
final ByteBuffer buf = ByteBuffer.wrap(data);
final byte versionAndIsHashNull = buf.get();
final byte version = (byte) (0x7F & versionAndIsHashNull);
final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
final SubscriptionWrapper.Instruction inst = SubscriptionWrapper.Instruction.fromValue(buf.get());
final long[] hash;
int lengthSum = 2; //The first 2 bytes
if (isHashNull) {
hash = null;
} else {
hash = new long[2];
hash[0] = buf.getLong();
hash[1] = buf.getLong();
lengthSum += 2 * Long.BYTES;
}
final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
final K primaryKey = primaryKeyDeserializer.deserialize(topic, primaryKeyRaw);
return new SubscriptionWrapper<>(hash, inst, primaryKey, version);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
@ -29,7 +30,7 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
protected final String sourceName;
protected final String repartitionTopic;
protected final ProcessorParameters processorParameters;
protected final StreamPartitioner<K, V> partitioner;
BaseRepartitionNode(final String nodeName,
final String sourceName,
@ -37,7 +38,8 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String sinkName,
final String repartitionTopic) {
final String repartitionTopic,
final StreamPartitioner<K, V> partitioner) {
super(nodeName);
@ -47,6 +49,7 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
this.sourceName = sourceName;
this.repartitionTopic = repartitionTopic;
this.processorParameters = processorParameters;
this.partitioner = partitioner;
}
abstract Serializer<V> getValueSerializer();
@ -61,7 +64,8 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
", sinkName='" + sinkName + '\'' +
", sourceName='" + sourceName + '\'' +
", repartitionTopic='" + repartitionTopic + '\'' +
", processorParameters=" + processorParameters +
", processorParameters=" + processorParameters + '\'' +
", partitioner=" + partitioner +
"} " + super.toString();
}
}

View File

@ -43,7 +43,8 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
keySerde,
valueSerde,
sinkName,
repartitionTopic
repartitionTopic,
null
);
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
/**
* Too much specific information to generalize so the Foreign Key KTable-KTable join requires a specific node.
*/
public class KTableKTableForeignKeyJoinResolutionNode<K, V, KO, VO> extends StreamsGraphNode {
private final ProcessorParameters<KO, SubscriptionWrapper<K>> joinOneToOneProcessorParameters;
private final ProcessorParameters<KO, Change<VO>> joinByPrefixProcessorParameters;
private final ProcessorParameters<K, SubscriptionResponseWrapper<VO>> resolverProcessorParameters;
private final String finalRepartitionTopicName;
private final String finalRepartitionSinkName;
private final String finalRepartitionSourceName;
private final Serde<K> keySerde;
private final Serde<SubscriptionResponseWrapper<VO>> subResponseSerde;
private final KTableValueGetterSupplier<K, V> originalValueGetter;
public KTableKTableForeignKeyJoinResolutionNode(final String nodeName,
final ProcessorParameters<KO, SubscriptionWrapper<K>> joinOneToOneProcessorParameters,
final ProcessorParameters<KO, Change<VO>> joinByPrefixProcessorParameters,
final ProcessorParameters<K, SubscriptionResponseWrapper<VO>> resolverProcessorParameters,
final String finalRepartitionTopicName,
final String finalRepartitionSinkName,
final String finalRepartitionSourceName,
final Serde<K> keySerde,
final Serde<SubscriptionResponseWrapper<VO>> subResponseSerde,
final KTableValueGetterSupplier<K, V> originalValueGetter
) {
super(nodeName);
this.joinOneToOneProcessorParameters = joinOneToOneProcessorParameters;
this.joinByPrefixProcessorParameters = joinByPrefixProcessorParameters;
this.resolverProcessorParameters = resolverProcessorParameters;
this.finalRepartitionTopicName = finalRepartitionTopicName;
this.finalRepartitionSinkName = finalRepartitionSinkName;
this.finalRepartitionSourceName = finalRepartitionSourceName;
this.keySerde = keySerde;
this.subResponseSerde = subResponseSerde;
this.originalValueGetter = originalValueGetter;
}
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
topologyBuilder.addInternalTopic(finalRepartitionTopicName);
//Repartition back to the original partitioning structure
topologyBuilder.addSink(finalRepartitionSinkName, finalRepartitionTopicName,
keySerde.serializer(), subResponseSerde.serializer(),
null,
joinByPrefixProcessorParameters.processorName(), joinOneToOneProcessorParameters.processorName());
topologyBuilder.addSource(null, finalRepartitionSourceName, new FailOnInvalidTimestamp(),
keySerde.deserializer(), subResponseSerde.deserializer(), finalRepartitionTopicName);
//Connect highwaterProcessor to source, add the state store, and connect the statestore with the processor.
topologyBuilder.addProcessor(resolverProcessorParameters.processorName(), resolverProcessorParameters.processorSupplier(), finalRepartitionSourceName);
topologyBuilder.connectProcessorAndStateStores(resolverProcessorParameters.processorName(), originalValueGetter.storeNames());
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V> {
@ -32,7 +33,8 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String sinkName,
final String repartitionTopic) {
final String repartitionTopic,
final StreamPartitioner<K, V> partitioner) {
super(
nodeName,
@ -41,9 +43,9 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
keySerde,
valueSerde,
sinkName,
repartitionTopic
repartitionTopic,
partitioner
);
}
public Serde<K> keySerde() {
@ -91,7 +93,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
repartitionTopic,
keySerializer,
getValueSerializer(),
null,
partitioner,
processorParameters.processorName()
);
@ -120,6 +122,7 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
private String sinkName;
private String sourceName;
private String repartitionTopic;
private StreamPartitioner<K, V> partitioner;
private OptimizableRepartitionNodeBuilder() {
}
@ -160,6 +163,11 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
return this;
}
public OptimizableRepartitionNodeBuilder<K, V> withPartitioner(final StreamPartitioner<K, V> partitioner) {
this.partitioner = partitioner;
return this;
}
public OptimizableRepartitionNode<K, V> build() {
return new OptimizableRepartitionNode<>(
@ -169,7 +177,8 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, V>
keySerde,
valueSerde,
sinkName,
repartitionTopic
repartitionTopic,
partitioner
);
}

View File

@ -28,6 +28,13 @@ public class ProcessorGraphNode<K, V> extends StreamsGraphNode {
private final ProcessorParameters<K, V> processorParameters;
public ProcessorGraphNode(final ProcessorParameters<K, V> processorParameters) {
super(processorParameters.processorName());
this.processorParameters = processorParameters;
}
public ProcessorGraphNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters) {

View File

@ -18,21 +18,36 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Stream;
public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final String[] storeNames;
private final StoreBuilder<? extends StateStore> storeBuilder;
/**
* Create a node representing a stateful processor, where the named stores have already been registered.
*/
public StatefulProcessorNode(final ProcessorParameters<K, V> processorParameters,
final Set<StoreBuilder<? extends StateStore>> preRegisteredStores,
final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
super(processorParameters.processorName(), processorParameters);
final Stream<String> registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name);
final Stream<String> valueGetterStoreNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames()));
storeNames = Stream.concat(registeredStoreNames, valueGetterStoreNames).toArray(String[]::new);
storeBuilder = null;
}
/**
* Create a node representing a stateful processor, where the named store has already been registered.
* Create a node representing a stateful processor, where the named stores have already been registered.
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
@ -80,5 +95,6 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
if (storeBuilder != null) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
public class StreamSinkNode<K, V> extends StreamsGraphNode {
@ -60,6 +61,9 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
@SuppressWarnings("unchecked")
final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames);
} else if (topicNameExtractor instanceof StaticTopicNameExtractor) {
final String topicName = ((StaticTopicNameExtractor) topicNameExtractor).topicName;
topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames);
} else {
topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames);
}

View File

@ -18,7 +18,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.ThreadCache;
/**
@ -66,4 +68,15 @@ public interface InternalProcessorContext extends ProcessorContext {
* Mark this context as being uninitialized
*/
void uninitialize();
/**
* Get a correctly typed state store, given a handle on the original builder.
* @param builder
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
return (T) getStateStore(builder.name());
}
}

View File

@ -29,8 +29,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +50,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class InternalTopologyBuilder {
@ -1149,21 +1150,45 @@ public class InternalTopologyBuilder {
}
public synchronized Collection<Set<String>> copartitionGroups() {
final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
for (final Set<String> nodeNames : copartitionSourceGroups) {
final Set<String> copartitionGroup = new HashSet<>();
for (final String node : nodeNames) {
final List<String> topics = nodeToSourceTopics.get(node);
if (topics != null) {
copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
// compute transitive closures of copartitionGroups to relieve registering code to know all members
// of a copartitionGroup at the same time
final List<Set<String>> copartitionSourceTopics =
copartitionSourceGroups
.stream()
.map(sourceGroup ->
sourceGroup
.stream()
.flatMap(node -> maybeDecorateInternalSourceTopics(nodeToSourceTopics.get(node)).stream())
.collect(Collectors.toSet())
).collect(Collectors.toList());
final Map<String, Set<String>> topicsToCopartitionGroup = new LinkedHashMap<>();
for (final Set<String> topics : copartitionSourceTopics) {
if (topics != null) {
Set<String> coparititonGroup = null;
for (final String topic : topics) {
coparititonGroup = topicsToCopartitionGroup.get(topic);
if (coparititonGroup != null) {
break;
}
}
if (coparititonGroup == null) {
coparititonGroup = new HashSet<>();
}
coparititonGroup.addAll(maybeDecorateInternalSourceTopics(topics));
for (final String topic : topics) {
topicsToCopartitionGroup.put(topic, coparititonGroup);
}
}
list.add(Collections.unmodifiableSet(copartitionGroup));
}
return Collections.unmodifiableList(list);
final Set<Set<String>> uniqueCopartitionGroups = new HashSet<>(topicsToCopartitionGroup.values());
return Collections.unmodifiableList(new ArrayList<>(uniqueCopartitionGroups));
}
private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
if (sourceTopics == null) {
return Collections.emptyList();
}
final List<String> decoratedTopics = new ArrayList<>();
for (final String topic : sourceTopics) {
if (internalTopicNames.contains(topic)) {

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksIterator;
import java.util.Set;
class RocksDBPrefixIterator extends RocksDbIterator {
private byte[] rawPrefix;
RocksDBPrefixIterator(final String name,
final RocksIterator newIterator,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes prefix) {
super(name, newIterator, openIterators);
rawPrefix = prefix.get();
newIterator.seek(rawPrefix);
}
@Override
public synchronized boolean hasNext() {
if (!super.hasNext()) {
return false;
}
final byte[] rawNextKey = super.peekNextKey().get();
for (int i = 0; i < rawPrefix.length; i++) {
if (i == rawNextKey.length) {
throw new IllegalStateException("Unexpected RocksDB Key Value. Should have been skipped with seek.");
}
if (rawNextKey[i] != rawPrefix[i]) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.utils.BytesTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchemaTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerdeTest;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerdeTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
* This suite runs all the tests related to the KTable-KTable foreign key join feature.
*
* It can be used from an IDE to selectively just run these tests when developing code related to KTable-KTable
* foreign key join.
*
* If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all
* these tests are already included in the `:streams:test` task.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
BytesTest.class,
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
KTableKTableForeignKeyJoinIntegrationTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class
})
public class ForeignKeyJoinSuite {
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static int NUM_BROKERS = 1;
@ClassRule
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String TABLE_1 = "table1";
private final static String TABLE_2 = "table2";
private final static String TABLE_3 = "table3";
private final static String OUTPUT = "output-";
private static Properties streamsConfig;
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
private final static Properties CONSUMER_CONFIG = new Properties();
private final static Properties PRODUCER_CONFIG_1 = new Properties();
private final static Properties PRODUCER_CONFIG_2 = new Properties();
private final static Properties PRODUCER_CONFIG_3 = new Properties();
@BeforeClass
public static void beforeTest() throws Exception {
//Use multiple partitions to ensure distribution of keys.
CLUSTER.createTopic(TABLE_1, 3, 1);
CLUSTER.createTopic(TABLE_2, 5, 1);
CLUSTER.createTopic(TABLE_3, 7, 1);
CLUSTER.createTopic(OUTPUT, 11, 1);
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_1.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_2.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_3.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_3.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
final List<KeyValue<Integer, Float>> table1 = Arrays.asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
new KeyValue<>(3, -1.22f), //Won't be joined in yet.
new KeyValue<>(4, -2.22f) //Won't be joined in at all.
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<String, Long>> table2 = Arrays.asList(
new KeyValue<>("0", 0L), //partition 2
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L), //partition 2
new KeyValue<>("4", 40L), //partition 1
new KeyValue<>("5", 50L), //partition 0
new KeyValue<>("6", 60L), //partition 1
new KeyValue<>("7", 70L), //partition 0
new KeyValue<>("8", 80L), //partition 0
new KeyValue<>("9", 90L) //partition 2
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<Integer, String>> table3 = Arrays.asList(
new KeyValue<>(10, "waffle")
);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, PRODUCER_CONFIG_3, MOCK_TIME);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@Before
public void before() throws IOException {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@After
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
private enum JoinType {
INNER
}
@Test
public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
final Set<KeyValue<Integer, String>> expectedOne = new HashSet<>();
expectedOne.add(new KeyValue<>(1, "value1=1.33,value2=10,value3=waffle"));
verifyKTableKTableJoin(JoinType.INNER, expectedOne, true);
}
private void verifyKTableKTableJoin(final JoinType joinType,
final Set<KeyValue<Integer, String>> expectedResult,
final boolean verifyQueryableState) throws Exception {
final String queryableName = verifyQueryableState ? joinType + "-store1" : null;
final String queryableNameTwo = verifyQueryableState ? joinType + "-store2" : null;
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType + queryableName);
streams = prepareTopology(queryableName, queryableNameTwo);
streamsTwo = prepareTopology(queryableName, queryableNameTwo);
streamsThree = prepareTopology(queryableName, queryableNameTwo);
streams.start();
streamsTwo.start();
streamsThree.start();
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedResult.size()));
assertEquals(expectedResult, result);
}
private KafkaStreams prepareTopology(final String queryableName, final String queryableNameTwo) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, Float> table1 = builder.table(TABLE_1, Consumed.with(Serdes.Integer(), Serdes.Float()));
final KTable<String, Long> table2 = builder.table(TABLE_2, Consumed.with(Serdes.String(), Serdes.Long()));
final KTable<Integer, String> table3 = builder.table(TABLE_3, Consumed.with(Serdes.Integer(), Serdes.String()));
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableName != null) {
materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.String())
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materializedTwo;
if (queryableNameTwo != null) {
materializedTwo = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableNameTwo)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.String())
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
}
final Function<Float, String> tableOneKeyExtractor = value -> Integer.toString((int) value.floatValue());
final Function<String, Integer> joinedTableKeyExtractor = value -> {
//Hardwired to return the desired foreign key as a test shortcut
if (value.contains("value2=10"))
return 10;
else
return 0;
};
final ValueJoiner<Float, Long, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
final ValueJoiner<String, String, String> joinerTwo = (value1, value2) -> value1 + ",value3=" + value2;
table1.join(table2, tableOneKeyExtractor, joiner, materialized)
.join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo)
.toStream()
.to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
}
}

View File

@ -0,0 +1,699 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertArrayEquals;
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyJoinIntegrationTest {
private final static int NUM_BROKERS = 1;
@ClassRule
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final static MockTime MOCK_TIME = CLUSTER.time;
private final static String LEFT_TABLE = "left_table";
private final static String RIGHT_TABLE = "right_table";
private final static String OUTPUT = "output-topic";
private static Properties streamsConfig;
private KafkaStreams streams;
private KafkaStreams streamsTwo;
private KafkaStreams streamsThree;
private static final Properties CONSUMER_CONFIG = new Properties();
private static final Properties LEFT_PROD_CONF = new Properties();
private static final Properties RIGHT_PROD_CONF = new Properties();
@BeforeClass
public static void beforeTest() {
//Use multiple partitions to ensure distribution of keys.
LEFT_PROD_CONF.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
LEFT_PROD_CONF.put(ProducerConfig.ACKS_CONFIG, "all");
LEFT_PROD_CONF.put(ProducerConfig.RETRIES_CONFIG, 0);
LEFT_PROD_CONF.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
LEFT_PROD_CONF.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class);
RIGHT_PROD_CONF.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
RIGHT_PROD_CONF.put(ProducerConfig.ACKS_CONFIG, "all");
RIGHT_PROD_CONF.put(ProducerConfig.RETRIES_CONFIG, 0);
RIGHT_PROD_CONF.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
RIGHT_PROD_CONF.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@Before
public void before() throws IOException, InterruptedException {
CLUSTER.deleteTopicsAndWait(LEFT_TABLE);
CLUSTER.deleteTopicsAndWait(RIGHT_TABLE);
CLUSTER.deleteTopicsAndWait(OUTPUT);
CLUSTER.createTopic(LEFT_TABLE, 3, 1);
CLUSTER.createTopic(RIGHT_TABLE, 3, 1);
CLUSTER.createTopic(OUTPUT, 3, 1);
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@After
public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@Test
public void doInnerJoinFromLeftThenDeleteLeftEntity() throws Exception {
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L), new KeyValue<>("2", 20L)); //partition 0
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, false);
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.33f), new KeyValue<>(2, 2.77f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
expected.add(new KeyValue<>(2, "value1=2.77,value2=20"));
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(expected, result);
//Now delete one LHS entity such that one delete is propagated down to the output.
final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
expectedDeleted.add(new KeyValue<>(1, null));
final List<KeyValue<Integer, Float>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>(1, null));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, rightTableDeleteEvents, LEFT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedDeleted.size()));
assertEquals(expectedDeleted, resultDeleted);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(2, "value1=2.77,value2=20"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doLeftJoinFromLeftThenDeleteLeftEntity() throws Exception {
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L), new KeyValue<>("2", 20L)); //partition 0
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, true);
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.33f), new KeyValue<>(2, 2.77f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
expected.add(new KeyValue<>(2, "value1=2.77,value2=20"));
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(expected, result);
//Now delete one LHS entity such that one delete is propagated down to the output.
final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
expectedDeleted.add(new KeyValue<>(1, null));
final List<KeyValue<Integer, Float>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>(1, null));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, rightTableDeleteEvents, LEFT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedDeleted.size()));
assertEquals(expectedDeleted, resultDeleted);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(2, "value1=2.77,value2=20"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doInnerJoinFromRightThenDeleteRightEntity() throws Exception {
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 1.77f),
new KeyValue<>(3, 3.77f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, false);
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L)); //partition 2
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
//Ensure that the joined values exist in the output
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10")); //Will be deleted.
expected.add(new KeyValue<>(2, "value1=1.77,value2=10")); //Will be deleted.
expected.add(new KeyValue<>(3, "value1=3.77,value2=30")); //Will not be deleted.
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(expected, result);
//Now delete the RHS entity such that all matching keys have deletes propagated.
final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
expectedDeleted.add(new KeyValue<>(1, null));
expectedDeleted.add(new KeyValue<>(2, null));
final List<KeyValue<String, Long>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>("1", null));
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableDeleteEvents, RIGHT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedDeleted.size()));
assertEquals(expectedDeleted, resultDeleted);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(3, "value1=3.77,value2=30"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doLeftJoinFromRightThenDeleteRightEntity() throws Exception {
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 1.77f),
new KeyValue<>(3, 3.77f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, true);
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L)); //partition 2
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
//Ensure that the joined values exist in the output
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10")); //Will be deleted.
expected.add(new KeyValue<>(2, "value1=1.77,value2=10")); //Will be deleted.
expected.add(new KeyValue<>(3, "value1=3.77,value2=30")); //Will not be deleted.
//final HashSet<KeyValue<Integer, String>> expected = new HashSet<>(buildExpectedResults(leftTableEvents, rightTableEvents, false));
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(expected, result);
//Now delete the RHS entity such that all matching keys have deletes propagated.
//This will exercise the joiner with the RHS value == null.
final Set<KeyValue<Integer, String>> expectedDeleted = new HashSet<>();
expectedDeleted.add(new KeyValue<>(1, "value1=1.33,value2=null"));
expectedDeleted.add(new KeyValue<>(2, "value1=1.77,value2=null"));
final List<KeyValue<String, Long>> rightTableDeleteEvents = Arrays.asList(new KeyValue<>("1", null));
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableDeleteEvents, RIGHT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> resultDeleted = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expectedDeleted.size()));
assertEquals(expectedDeleted, resultDeleted);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(1, "value1=1.33,value2=null"));
expMatResults.add(new KeyValue<>(2, "value1=1.77,value2=null"));
expMatResults.add(new KeyValue<>(3, "value1=3.77,value2=30"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doInnerJoinProduceNullsWhenValueHasNonMatchingForeignKey() throws Exception {
//There is no matching extracted foreign-key of 8 anywhere. Should not produce any output for INNER JOIN, only
//because the state is transitioning from oldValue=null -> newValue=8.33.
List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 8.33f));
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L)); //partition 0
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, false);
//There is also no matching extracted foreign-key for 18 anywhere. This WILL produce a null output for INNER JOIN,
//since we cannot remember (maintain state) that the FK=8 also produced a null result.
leftTableEvents = Arrays.asList(new KeyValue<>(1, 18.00f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> expected = new LinkedList<>();
expected.add(new KeyValue<>(1, null));
final List<KeyValue<Integer, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size());
assertEquals(result, expected);
//Another change to FK that has no match on the RHS will result in another null
leftTableEvents = Arrays.asList(new KeyValue<>(1, 100.00f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
//Consume the next event - note that we are using the same consumerGroupId, so this will consume a new event.
final List<KeyValue<Integer, String>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size());
assertEquals(result2, expected);
//Now set the LHS event FK to match the rightTableEvents key-value.
leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.11f));
final List<KeyValue<Integer, String>> expected3 = new LinkedList<>();
expected3.add(new KeyValue<>(1, "value1=1.11,value2=10"));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> result3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected3.size());
assertEquals(result3, expected3);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(1, "value1=1.11,value2=10"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doLeftJoinProduceJoinedResultsWhenValueHasNonMatchingForeignKey() throws Exception {
//There is no matching extracted foreign-key of 8 anywhere.
//However, it will still run the join function since this is LEFT join.
List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(new KeyValue<>(1, 8.33f));
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(new KeyValue<>("1", 10L)); //partition 0
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, true);
final List<KeyValue<Integer, String>> expected = new LinkedList<>();
expected.add(new KeyValue<>(1, "value1=8.33,value2=null"));
final List<KeyValue<Integer, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size());
assertEquals(expected, result);
//There is also no matching extracted foreign-key for 18 anywhere.
//However, it will still run the join function since this if LEFT join.
leftTableEvents = Arrays.asList(new KeyValue<>(1, 18.0f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> expected2 = new LinkedList<>();
expected2.add(new KeyValue<>(1, "value1=18.0,value2=null"));
final List<KeyValue<Integer, String>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected2.size());
assertEquals(expected2, result2);
leftTableEvents = Arrays.asList(new KeyValue<>(1, 1.11f));
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> expected3 = new LinkedList<>();
expected3.add(new KeyValue<>(1, "value1=1.11,value2=10"));
final List<KeyValue<Integer, String>> result3 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected3.size());
assertEquals(expected3, result3);
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.add(new KeyValue<>(1, "value1=1.11,value2=10"));
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doInnerJoinFilterOutRapidlyChangingForeignKeyValues() throws Exception {
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f),
new KeyValue<>(3, -1.22f), //Won't be joined in
new KeyValue<>(4, -2.22f), //Won't be joined in
new KeyValue<>(5, 2.22f)
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
new KeyValue<>("0", 0L), //partition 2
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L), //partition 2
new KeyValue<>("4", 40L), //partition 1
new KeyValue<>("5", 50L), //partition 0
new KeyValue<>("6", 60L), //partition 1
new KeyValue<>("7", 70L), //partition 0
new KeyValue<>("8", 80L), //partition 0
new KeyValue<>("9", 90L) //partition 2
);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
expected.add(new KeyValue<>(2, "value1=2.22,value2=20"));
expected.add(new KeyValue<>(5, "value1=2.22,value2=20"));
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, false);
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(result, expected);
//Rapidly change the foreign key, to validate that the hashing prevents incorrect results from being output,
//and that eventually the correct value is output.
final List<KeyValue<Integer, Float>> table1ForeignKeyChange = Arrays.asList(
new KeyValue<>(3, 2.22f), //Partition 2
new KeyValue<>(3, 3.33f), //Partition 2
new KeyValue<>(3, 4.44f), //Partition 1
new KeyValue<>(3, 5.55f), //Partition 0
new KeyValue<>(3, 9.99f), //Partition 2
new KeyValue<>(3, 8.88f), //Partition 0
new KeyValue<>(3, 0.23f), //Partition 2
new KeyValue<>(3, 7.77f), //Partition 0
new KeyValue<>(3, 6.66f), //Partition 1
new KeyValue<>(3, 1.11f) //Partition 0 - This will be the final result.
);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, table1ForeignKeyChange, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> resultTwo = IntegrationTestUtils.readKeyValues(OUTPUT, CONSUMER_CONFIG, 15 * 1000L, Integer.MAX_VALUE);
final List<KeyValue<Integer, String>> expectedTwo = new LinkedList<>();
expectedTwo.add(new KeyValue<>(3, "value1=1.11,value2=10"));
assertArrayEquals(resultTwo.toArray(), expectedTwo.toArray());
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.addAll(expected);
expMatResults.addAll(expectedTwo);
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
@Test
public void doLeftJoinFilterOutRapidlyChangingForeignKeyValues() throws Exception {
final List<KeyValue<Integer, Float>> leftTableEvents = Arrays.asList(
new KeyValue<>(1, 1.33f),
new KeyValue<>(2, 2.22f)
);
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<String, Long>> rightTableEvents = Arrays.asList(
new KeyValue<>("0", 0L), //partition 2
new KeyValue<>("1", 10L), //partition 0
new KeyValue<>("2", 20L), //partition 2
new KeyValue<>("3", 30L), //partition 2
new KeyValue<>("4", 40L), //partition 1
new KeyValue<>("5", 50L), //partition 0
new KeyValue<>("6", 60L), //partition 1
new KeyValue<>("7", 70L), //partition 0
new KeyValue<>("8", 80L), //partition 0
new KeyValue<>("9", 90L) //partition 2
);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTableEvents, LEFT_PROD_CONF, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTableEvents, RIGHT_PROD_CONF, MOCK_TIME);
final Set<KeyValue<Integer, String>> expected = new HashSet<>();
expected.add(new KeyValue<>(1, "value1=1.33,value2=10"));
expected.add(new KeyValue<>(2, "value1=2.22,value2=20"));
final String currentMethodName = new Object() { }
.getClass()
.getEnclosingMethod()
.getName();
createAndStartStreamsApplication(currentMethodName, false);
final Set<KeyValue<Integer, String>> result = new HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
CONSUMER_CONFIG,
OUTPUT,
expected.size()));
assertEquals(result, expected);
//Rapidly change the foreign key, to validate that the hashing prevents incorrect results from being output,
//and that eventually the correct value is output.
final List<KeyValue<Integer, Float>> table1ForeignKeyChange = Arrays.asList(
new KeyValue<>(3, 2.22f), //Partition 2
new KeyValue<>(3, 3.33f), //Partition 2
new KeyValue<>(3, 4.44f), //Partition 1
new KeyValue<>(3, 5.55f), //Partition 0
new KeyValue<>(3, 9.99f), //Partition 2
new KeyValue<>(3, 8.88f), //Partition 0
new KeyValue<>(3, 0.23f), //Partition 2
new KeyValue<>(3, 7.77f), //Partition 0
new KeyValue<>(3, 6.66f), //Partition 1
new KeyValue<>(3, 1.11f) //Partition 0 - This will be the final result.
);
IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, table1ForeignKeyChange, LEFT_PROD_CONF, MOCK_TIME);
final List<KeyValue<Integer, String>> resultTwo = IntegrationTestUtils.readKeyValues(OUTPUT, CONSUMER_CONFIG, 15 * 1000L, Integer.MAX_VALUE);
final List<KeyValue<Integer, String>> expectedTwo = new LinkedList<>();
expectedTwo.add(new KeyValue<>(3, "value1=1.11,value2=10"));
assertArrayEquals(resultTwo.toArray(), expectedTwo.toArray());
//Ensure the state stores have the correct values within:
final Set<KeyValue<Integer, String>> expMatResults = new HashSet<>();
expMatResults.addAll(expected);
expMatResults.addAll(expectedTwo);
validateQueryableStoresContainExpectedKeyValues(expMatResults, currentMethodName);
}
private void createAndStartStreamsApplication(final String queryableStoreName, final boolean leftJoin) {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + queryableStoreName);
streams = prepareTopology(queryableStoreName, leftJoin);
streamsTwo = prepareTopology(queryableStoreName, leftJoin);
streamsThree = prepareTopology(queryableStoreName, leftJoin);
streams.start();
streamsTwo.start();
streamsThree.start();
}
// These are hardwired into the test logic for readability sake.
// Do not change unless you want to change all the test results as well.
private ValueJoiner<Float, Long, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
//Do not change. See above comment.
private Function<Float, String> tableOneKeyExtractor = value -> Integer.toString((int) value.floatValue());
private void validateQueryableStoresContainExpectedKeyValues(final Set<KeyValue<Integer, String>> expectedResult,
final String queryableStoreName) {
final ReadOnlyKeyValueStore<Integer, String> myJoinStoreOne = streams.store(queryableStoreName,
QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, String> myJoinStoreTwo = streamsTwo.store(queryableStoreName,
QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, String> myJoinStoreThree = streamsThree.store(queryableStoreName,
QueryableStoreTypes.keyValueStore());
// store only keeps last set of values, not entire stream of value changes
final Map<Integer, String> expectedInStore = new HashMap<>();
for (final KeyValue<Integer, String> expected : expectedResult) {
expectedInStore.put(expected.key, expected.value);
}
// depending on partition assignment, the values will be in one of the three stream clients.
for (final Map.Entry<Integer, String> expected : expectedInStore.entrySet()) {
final String one = myJoinStoreOne.get(expected.getKey());
final String two = myJoinStoreTwo.get(expected.getKey());
final String three = myJoinStoreThree.get(expected.getKey());
String result;
if (one != null)
result = one;
else if (two != null)
result = two;
else if (three != null)
result = three;
else
throw new RuntimeException("Cannot find key " + expected.getKey() + " in any of the state stores");
assertEquals(expected.getValue(), result);
}
//Merge all the iterators together to ensure that their sum equals the total set of expected elements.
final KeyValueIterator<Integer, String> allOne = myJoinStoreOne.all();
final KeyValueIterator<Integer, String> allTwo = myJoinStoreTwo.all();
final KeyValueIterator<Integer, String> allThree = myJoinStoreThree.all();
final List<KeyValue<Integer, String>> all = new LinkedList<>();
while (allOne.hasNext()) {
all.add(allOne.next());
}
while (allTwo.hasNext()) {
all.add(allTwo.next());
}
while (allThree.hasNext()) {
all.add(allThree.next());
}
allOne.close();
allTwo.close();
allThree.close();
for (final KeyValue<Integer, String> elem : all) {
assertTrue(expectedResult.contains(elem));
}
}
private KafkaStreams prepareTopology(final String queryableStoreName, final boolean leftJoin) {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, Float> left = builder.table(LEFT_TABLE, Consumed.with(Serdes.Integer(), Serdes.Float()));
final KTable<String, Long> right = builder.table(RIGHT_TABLE, Consumed.with(Serdes.String(), Serdes.Long()));
final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized;
if (queryableStoreName != null) {
materialized = Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(queryableStoreName)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.String())
.withCachingDisabled();
} else {
throw new RuntimeException("Current implementation of join on foreign key requires a materialized store");
}
if (leftJoin)
left.leftJoin(right, tableOneKeyExtractor, joiner, Named.as("customName"), materialized)
.toStream()
.to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
else
left.join(right, tableOneKeyExtractor, joiner, materialized)
.toStream()
.to(OUTPUT, Produced.with(Serdes.Integer(), Serdes.String()));
final Topology topology = builder.build(streamsConfig);
return new KafkaStreams(topology, streamsConfig);
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
public class CombinedKeySchemaTest {
@Test
public void nonNullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer());
final Integer primary = -999;
final Bytes result = cks.toBytes("foreignKey", primary);
final CombinedKey<String, Integer> deserializedKey = cks.fromBytes(result);
assertEquals("foreignKey", deserializedKey.getForeignKey());
assertEquals(primary, deserializedKey.getPrimaryKey());
}
@Test(expected = NullPointerException.class)
public void nullPrimaryKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer());
cks.toBytes("foreignKey", null);
}
@Test(expected = NullPointerException.class)
public void nullForeignKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer());
cks.toBytes(null, 10);
}
@Test
public void prefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer());
final String foreignKey = "someForeignKey";
final byte[] foreignKeySerializedData = Serdes.String().serializer().serialize("someTopic", foreignKey);
final Bytes prefix = cks.prefixBytes(foreignKey);
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length);
buf.putInt(foreignKeySerializedData.length);
buf.put(foreignKeySerializedData);
final Bytes expectedPrefixBytes = Bytes.wrap(buf.array());
assertEquals(expectedPrefixBytes, prefix);
}
@Test(expected = NullPointerException.class)
public void nullPrefixKeySerdeTest() {
final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("someTopic", Serdes.String(), Serdes.Integer());
final String foreignKey = null;
cks.prefixBytes(foreignKey);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNull;
public class SubscriptionResponseWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>) srwSerde.deserializer().deserialize(null, serResponse);
assertArrayEquals(hashedValue, result.getOriginalValueHash());
assertEquals(foreignValue, result.getForeignValue());
}
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null);
final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>) srwSerde.deserializer().deserialize(null, serResponse);
assertArrayEquals(hashedValue, result.getOriginalValueHash());
assertNull(result.getForeignValue());
}
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>) srwSerde.deserializer().deserialize(null, serResponse);
assertArrayEquals(hashedValue, result.getOriginalValueHash());
assertEquals(foreignValue, result.getForeignValue());
}
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>) srwSerde.deserializer().deserialize(null, serResponse);
assertArrayEquals(hashedValue, result.getOriginalValueHash());
assertEquals(foreignValue, result.getForeignValue());
}
@Test (expected = UnsupportedVersionException.class)
@SuppressWarnings("unchecked")
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@SuppressWarnings({"unchecked", "rawtypes"})
public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeTest() {
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
assertArrayEquals(hashedValue, deserialized.getHash());
assertEquals(originalKey, deserialized.getPrimaryKey());
}
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeNullHashTest() {
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String());
final long[] hashedValue = null;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
assertArrayEquals(hashedValue, deserialized.getHash());
assertEquals(originalKey, deserialized.getPrimaryKey());
}
@Test (expected = NullPointerException.class)
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullKeyTest() {
final String originalKey = null;
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
swSerde.serializer().serialize(null, wrapper);
}
@Test (expected = NullPointerException.class)
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullInstructionTest() {
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey);
swSerde.serializer().serialize(null, wrapper);
}
@Test (expected = UnsupportedVersionException.class)
public void shouldThrowExceptionOnUnsupportedVersionTest() {
final String originalKey = "originalKey";
final long[] hashedValue = null;
new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, (byte) 0x80);
}
}

View File

@ -87,7 +87,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
}
@Test
public void shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() {
public void shouldCloseOpenRangeIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
@ -127,5 +127,4 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
// ok
}
}
}

View File

@ -557,7 +557,7 @@ public class RocksDBStoreTest {
rocksDBStore.init(context, rocksDBStore);
try {
rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
fail("Should have thrown NullPointerException on deleting null key");
fail("Should have thrown NullPointerException on null range key");
} catch (final NullPointerException e) {
// this is good
}

View File

@ -61,6 +61,12 @@ private[scala] object FunctionsCompatConversions {
}
}
implicit class FunctionFromFunction[V, VR](val f: V => VR) extends AnyVal {
def asJavaFunction: java.util.function.Function[V, VR] = new java.util.function.Function[V, VR] {
override def apply(value: V): VR = f(value)
}
}
implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
def asValueMapper: ValueMapper[V, VR] = new ValueMapper[V, VR] {
override def apply(value: V): VR = f(value)

View File

@ -21,7 +21,7 @@ package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{Suppressed, ValueTransformerWithKeySupplier, KTable => KTableJ}
import org.apache.kafka.streams.kstream.{Suppressed, ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ}
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.state.KeyValueStore
@ -318,6 +318,42 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
): KTable[K, VR] =
inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
def join[VR, KO, VO](other: KTable[KO, VO],
keyExtractor: Function[V, KO],
joiner: ValueJoiner[V, VO, VR],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized)
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
* table are joined according to the result of keyExtractor on the other KTable.
*
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
def leftJoin[VR, KO, VO](other: KTable[KO, VO],
keyExtractor: Function[V, KO],
joiner: ValueJoiner[V, VO, VR],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, materialized)
/**
* Get the name of the local state store used that can be used to query this [[KTable]].
*