diff --git a/build.gradle b/build.gradle
index 2b7083f2921..210112bb13d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1224,6 +1224,12 @@ project(':streams') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream()
}
+
+ test {
+ // The suites are for running sets of tests in IDEs.
+ // Gradle will run each test class, so we exclude the suites to avoid redundantly running the tests twice.
+ exclude '**/*Suite.class'
+ }
}
project(':streams:streams-scala') {
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6a24ef94c29..7a04f8d5b94 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -1,4 +1,4 @@
-
+
+
@@ -90,6 +92,9 @@
+
+
@@ -149,7 +154,7 @@
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
+ files="(KTableImpl|StreamsPartitionAssignor.java)"/>
@@ -229,7 +234,8 @@
-
+
{
}
/**
- * A byte array comparator based on lexicographic ordering.
+ * Increment the underlying byte array by adding 1. Throws an IndexOutOfBoundsException if incrementing would cause
+ * the underlying input byte array to overflow.
+ *
+ * @param input - The byte array to increment
+ * @return A new copy of the incremented byte array.
+ */
+ public static Bytes increment(Bytes input) throws IndexOutOfBoundsException {
+ byte[] inputArr = input.get();
+ byte[] ret = new byte[inputArr.length];
+ int carry = 1;
+ for (int i = inputArr.length - 1; i >= 0; i--) {
+ if (inputArr[i] == (byte) 0xFF && carry == 1) {
+ ret[i] = (byte) 0x00;
+ } else {
+ ret[i] = (byte) (inputArr[i] + carry);
+ carry = 0;
+ }
+ }
+ if (carry == 0) {
+ return wrap(ret);
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * A byte array comparator based on lexicograpic ordering.
*/
public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
new file mode 100644
index 00000000000..bf7ec712ddc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/BytesTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertEquals;
+
+public class BytesTest {
+
+ @Test
+ public void testIncrement() {
+ byte[] input = new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xFF};
+ byte[] expected = new byte[]{(byte) 0xAB, (byte) 0xCE, (byte) 0x00};
+ Bytes output = Bytes.increment(Bytes.wrap(input));
+ assertArrayEquals(output.get(), expected);
+ }
+
+ @Test
+ public void testIncrementUpperBoundary() {
+ byte[] input = new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
+ assertThrows(IndexOutOfBoundsException.class, () -> Bytes.increment(Bytes.wrap(input)));
+ }
+
+ @Test
+ public void testIncrementWithSubmap() {
+ final NavigableMap map = new TreeMap<>();
+ Bytes key1 = Bytes.wrap(new byte[]{(byte) 0xAA});
+ byte[] val = new byte[]{(byte) 0x00};
+ map.put(key1, val);
+
+ Bytes key2 = Bytes.wrap(new byte[]{(byte) 0xAA, (byte) 0xAA});
+ map.put(key2, val);
+
+ Bytes key3 = Bytes.wrap(new byte[]{(byte) 0xAA, (byte) 0x00, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
+ map.put(key3, val);
+
+ Bytes key4 = Bytes.wrap(new byte[]{(byte) 0xAB, (byte) 0x00});
+ map.put(key4, val);
+
+ Bytes key5 = Bytes.wrap(new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01});
+ map.put(key5, val);
+
+ Bytes prefix = key1;
+ Bytes prefixEnd = Bytes.increment(prefix);
+
+ Comparator super Bytes> comparator = map.comparator();
+ final int result = comparator == null ? prefix.compareTo(prefixEnd) : comparator.compare(prefix, prefixEnd);
+ NavigableMap subMapResults;
+ if (result > 0) {
+ //Prefix increment would cause a wrap-around. Get the submap from toKey to the end of the map
+ subMapResults = map.tailMap(prefix, true);
+ } else {
+ subMapResults = map.subMap(prefix, true, prefixEnd, false);
+ }
+
+ NavigableMap subMapExpected = new TreeMap<>();
+ subMapExpected.put(key1, val);
+ subMapExpected.put(key2, val);
+ subMapExpected.put(key3, val);
+
+ assertEquals(subMapExpected.keySet(), subMapResults.keySet());
+ }
+}
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 597b9d34c23..574e9c66e29 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -675,7 +675,9 @@ public class StreamsResetter {
// Cf. https://issues.apache.org/jira/browse/KAFKA-7930
return !isInputTopic(topicName) && !isIntermediateTopic(topicName)
&& topicName.startsWith(options.valueOf(applicationIdOption) + "-")
- && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+ && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
+ || topicName.endsWith("-subscription-registration-topic")
+ || topicName.endsWith("-subscription-response-topic"));
}
public static void main(final String[] args) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index a2b9bafd9a7..21e42df688e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import java.util.function.Function;
+
/**
* {@code KTable} is an abstraction of a changelog stream from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
@@ -2117,6 +2119,92 @@ public interface KTable {
final Named named,
final Materialized> materialized);
+ /**
+ *
+ * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
+ * table are joined according to the result of keyExtractor on the other KTable.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V)
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized. Cannot be {@code null}
+ * @param the value type of the result {@code KTable}
+ * @param the key type of the other {@code KTable}
+ * @param the value type of the other {@code KTable}
+ * @return
+ */
+ KTable join(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Named named,
+ final Materialized> materialized);
+
+ /**
+ *
+ * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
+ * table are joined according to the result of keyExtractor on the other KTable.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V)
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized. Cannot be {@code null}
+ * @param the value type of the result {@code KTable}
+ * @param the key type of the other {@code KTable}
+ * @param the value type of the other {@code KTable}
+ * @return
+ */
+ KTable join(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Materialized> materialized);
+
+ /**
+ *
+ * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
+ * table are joined according to the result of keyExtractor on the other KTable.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
+ * * resultant foreignKey is null, the record will not propagate to the output.
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized. Cannot be {@code null}
+ * @param the value type of the result {@code KTable}
+ * @param the key type of the other {@code KTable}
+ * @param the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ */
+ KTable leftJoin(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Named named,
+ final Materialized> materialized);
+
+ /**
+ *
+ * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
+ * table are joined according to the result of keyExtractor on the other KTable.
+ *
+ * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
+ * resultant foreignKey is null, the record will not propagate to the output.
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized. Cannot be {@code null}
+ * @param the value type of the result {@code KTable}
+ * @param the key type of the other {@code KTable}
+ * @param the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ */
+ KTable leftJoin(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Materialized> materialized);
+
/**
* Get the name of the local state store used that can be used to query this {@code KTable}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 4bc102a746d..05e04e8c291 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,8 +17,10 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
@@ -27,15 +29,29 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinForeignProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapperSerde;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@@ -43,17 +59,22 @@ import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcess
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
@@ -89,6 +110,15 @@ public class KTableImpl extends AbstractStream implements KTable<
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
+ private static final String FK_JOIN_STATE_STORE_NAME = "KTABLE-INTERNAL-SUBSCRIPTION-STATE-STORE-";
+ private static final String SUBSCRIPTION_REGISTRATION = "KTABLE-SUBSCRIPTION-REGISTRATION-";
+ private static final String SUBSCRIPTION_RESPONSE = "KTABLE-SUBSCRIPTION-RESPONSE-";
+ private static final String SUBSCRIPTION_PROCESSOR = "KTABLE-SUBSCRIPTION-PROCESSOR-";
+ private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR = "KTABLE-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
+ private static final String FK_JOIN_OUTPUT_PROCESSOR = "KTABLE-OUTPUT-PROCESSOR-";
+ private static final String TOPIC_SUFFIX = "-topic";
+ private static final String SINK_NAME = "KTABLE-SINK-";
+
private final ProcessorSupplier, ?> processorSupplier;
private final String queryableStoreName;
@@ -495,7 +525,7 @@ public class KTableImpl extends AbstractStream implements KTable<
storeName,
this
);
-
+
final ProcessorGraphNode> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
@@ -803,4 +833,224 @@ public class KTableImpl extends AbstractStream implements KTable<
return (ProcessorParameters) kObjectProcessorParameters;
}
+ @Override
+ public KTable join(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Named named,
+ final Materialized> materialized) {
+
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized), false);
+ }
+
+ @Override
+ public KTable join(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Materialized> materialized) {
+
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), new MaterializedInternal<>(materialized), false);
+ }
+
+ @Override
+ public KTable leftJoin(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Named named,
+ final Materialized> materialized) {
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, new MaterializedInternal<>(materialized), true);
+ }
+
+ @Override
+ public KTable leftJoin(final KTable other,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Materialized> materialized) {
+
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), new MaterializedInternal<>(materialized), true);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private KTable doJoinOnForeignKey(final KTable foreignKeyTable,
+ final Function foreignKeyExtractor,
+ final ValueJoiner joiner,
+ final Named joinName,
+ final MaterializedInternal> materializedInternal,
+ final boolean leftJoin) {
+ Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
+ Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+ Objects.requireNonNull(joinName, "joinName can't be null");
+ Objects.requireNonNull(materializedInternal, "materialized can't be null");
+
+ //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
+ //such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
+ //not be done needlessly.
+ ((KTableImpl, ?, ?>) foreignKeyTable).enableSendingOldValues();
+
+ //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
+ //This occurs whenever the extracted foreignKey changes values.
+ enableSendingOldValues();
+
+ final Serde foreignKeySerde = ((KTableImpl) foreignKeyTable).keySerde;
+ final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(keySerde);
+ final SubscriptionResponseWrapperSerde responseWrapperSerde =
+ new SubscriptionResponseWrapperSerde<>(((KTableImpl) foreignKeyTable).valSerde);
+
+
+ final NamedInternal renamed = new NamedInternal(joinName);
+ final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
+ builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
+ final CombinedKeySchema combinedKeySchema = new CombinedKeySchema<>(subscriptionTopicName, foreignKeySerde, keySerde);
+
+ final ProcessorGraphNode> subscriptionNode = new ProcessorGraphNode<>(
+ new ProcessorParameters<>(
+ new ForeignJoinSubscriptionSendProcessorSupplier<>(
+ foreignKeyExtractor,
+ foreignKeySerde,
+ subscriptionTopicName,
+ valSerde.serializer(),
+ leftJoin
+ ),
+ renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
+ )
+ );
+ builder.addGraphNode(streamsGraphNode, subscriptionNode);
+
+
+ final StreamSinkNode> subscriptionSink = new StreamSinkNode<>(
+ renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
+ new StaticTopicNameExtractor<>(subscriptionTopicName),
+ new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
+ );
+ builder.addGraphNode(subscriptionNode, subscriptionSink);
+
+ final StreamSourceNode> subscriptionSource = new StreamSourceNode<>(
+ renamed.suffixWithOrElseGet("-subscription-registration-source", builder, SOURCE_NAME),
+ Collections.singleton(subscriptionTopicName),
+ new ConsumedInternal<>(Consumed.with(foreignKeySerde, subscriptionWrapperSerde))
+ );
+ builder.addGraphNode(subscriptionSink, subscriptionSource);
+
+ // The subscription source is the source node on the *receiving* end *after* the repartition.
+ // This topic needs to be copartitioned with the Foreign Key table.
+ final Set copartitionedRepartitionSources =
+ new HashSet<>(((KTableImpl, ?, ?>) foreignKeyTable).sourceNodes);
+ copartitionedRepartitionSources.add(subscriptionSource.nodeName());
+ builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
+
+
+ final StoreBuilder>> subscriptionStore =
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(
+ renamed.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME)
+ ),
+ new Serdes.BytesSerde(),
+ subscriptionWrapperSerde
+ );
+ builder.addStateStore(subscriptionStore);
+
+ final StatefulProcessorNode> subscriptionReceiveNode =
+ new StatefulProcessorNode<>(
+ new ProcessorParameters<>(
+ new SubscriptionStoreReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
+ renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR)
+ ),
+ Collections.singleton(subscriptionStore),
+ Collections.emptySet()
+ );
+ builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
+
+ final StatefulProcessorNode, Change>>> subscriptionJoinForeignNode =
+ new StatefulProcessorNode<>(
+ new ProcessorParameters<>(
+ new SubscriptionJoinForeignProcessorSupplier<>(
+ ((KTableImpl) foreignKeyTable).valueGetterSupplier()
+ ),
+ renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
+ ),
+ Collections.emptySet(),
+ Collections.singleton(((KTableImpl) foreignKeyTable).valueGetterSupplier())
+ );
+ builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
+
+ final StatefulProcessorNode> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
+ new ProcessorParameters<>(
+ new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema),
+ renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
+ ),
+ Collections.singleton(subscriptionStore),
+ Collections.emptySet()
+ );
+ builder.addGraphNode(((KTableImpl) foreignKeyTable).streamsGraphNode, foreignJoinSubscriptionNode);
+
+
+ final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
+ builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName);
+
+ final StreamSinkNode> foreignResponseSink =
+ new StreamSinkNode<>(
+ renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
+ new StaticTopicNameExtractor<>(finalRepartitionTopicName),
+ new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
+ );
+ builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
+ builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);
+
+ final StreamSourceNode> foreignResponseSource = new StreamSourceNode<>(
+ renamed.suffixWithOrElseGet("-subscription-response-source", builder, SOURCE_NAME),
+ Collections.singleton(finalRepartitionTopicName),
+ new ConsumedInternal<>(Consumed.with(keySerde, responseWrapperSerde))
+ );
+ builder.addGraphNode(foreignResponseSink, foreignResponseSource);
+
+ // the response topic has to be copartitioned with the left (primary) side of the join
+ final Set resultSourceNodes = new HashSet<>(this.sourceNodes);
+ resultSourceNodes.add(foreignResponseSource.nodeName());
+ builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
+
+ final KTableValueGetterSupplier primaryKeyValueGetter = valueGetterSupplier();
+ final StatefulProcessorNode> resolverNode = new StatefulProcessorNode<>(
+ new ProcessorParameters<>(
+ new SubscriptionResolverJoinProcessorSupplier<>(
+ primaryKeyValueGetter,
+ valueSerde().serializer(),
+ joiner,
+ leftJoin
+ ),
+ renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
+ ),
+ Collections.emptySet(),
+ Collections.singleton(primaryKeyValueGetter)
+ );
+ builder.addGraphNode(foreignResponseSource, resolverNode);
+
+ final String resultProcessorName = renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_PROCESSOR);
+ final KTableSource resultProcessorSupplier = new KTableSource<>(materializedInternal.storeName(), materializedInternal.queryableStoreName());
+ final StoreBuilder> resultStore =
+ materializedInternal.queryableStoreName() == null
+ ? null
+ : new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize();
+ final TableProcessorNode resultNode = new TableProcessorNode<>(
+ resultProcessorName,
+ new ProcessorParameters<>(
+ resultProcessorSupplier,
+ resultProcessorName
+ ),
+ resultStore
+ );
+ builder.addGraphNode(resolverNode, resultNode);
+
+ return new KTableImpl(
+ resultProcessorName,
+ keySerde,
+ materializedInternal.valueSerde(),
+ resultSourceNodes,
+ materializedInternal.storeName(),
+ resultProcessorSupplier,
+ resultNode,
+ builder
+ );
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 7083b88cc54..bed221387bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -23,7 +24,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
public class KTableSourceValueGetterSupplier implements KTableValueGetterSupplier {
private final String storeName;
- KTableSourceValueGetterSupplier(final String storeName) {
+ public KTableSourceValueGetterSupplier(final String storeName) {
this.storeName = storeName;
}
@@ -49,6 +50,7 @@ public class KTableSourceValueGetterSupplier implements KTableValueGetterS
}
@Override
- public void close() {}
+ public void close() {
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
new file mode 100644
index 00000000000..0dd60fc026d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.util.Objects;
+
+public class CombinedKey {
+ private final KF foreignKey;
+ private final KP primaryKey;
+
+ CombinedKey(final KF foreignKey, final KP primaryKey) {
+ Objects.requireNonNull(foreignKey, "foreignKey can't be null");
+ Objects.requireNonNull(primaryKey, "primaryKey can't be null");
+ this.foreignKey = foreignKey;
+ this.primaryKey = primaryKey;
+ }
+
+ public KF getForeignKey() {
+ return foreignKey;
+ }
+
+ public KP getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public boolean equals(final KF foreignKey, final KP primaryKey) {
+ if (this.primaryKey == null) {
+ return false;
+ }
+ return this.foreignKey.equals(foreignKey) && this.primaryKey.equals(primaryKey);
+ }
+
+ @Override
+ public String toString() {
+ return "CombinedKey{" +
+ "foreignKey=" + foreignKey +
+ ", primaryKey=" + primaryKey +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
new file mode 100644
index 00000000000..8abe583b0c0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Factory for creating CombinedKey serializers / deserializers.
+ */
+public class CombinedKeySchema {
+ private final String serdeTopic;
+ private Serializer primaryKeySerializer;
+ private Deserializer primaryKeyDeserializer;
+ private Serializer foreignKeySerializer;
+ private Deserializer foreignKeyDeserializer;
+
+ public CombinedKeySchema(final String serdeTopic, final Serde foreignKeySerde, final Serde primaryKeySerde) {
+ this.serdeTopic = serdeTopic;
+ primaryKeySerializer = primaryKeySerde.serializer();
+ primaryKeyDeserializer = primaryKeySerde.deserializer();
+ foreignKeyDeserializer = foreignKeySerde.deserializer();
+ foreignKeySerializer = foreignKeySerde.serializer();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init(final ProcessorContext context) {
+ primaryKeySerializer = primaryKeySerializer == null ? (Serializer) context.keySerde().serializer() : primaryKeySerializer;
+ primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : primaryKeyDeserializer;
+ foreignKeySerializer = foreignKeySerializer == null ? (Serializer) context.keySerde().serializer() : foreignKeySerializer;
+ foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer) context.keySerde().deserializer() : foreignKeyDeserializer;
+ }
+
+ Bytes toBytes(final KO foreignKey, final K primaryKey) {
+ //The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan
+ //key is being created.
+ //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
+ final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, foreignKey);
+
+ //? bytes
+ final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(serdeTopic, primaryKey);
+
+ final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length + primaryKeySerializedData.length);
+ buf.putInt(foreignKeySerializedData.length);
+ buf.put(foreignKeySerializedData);
+ buf.put(primaryKeySerializedData);
+ return Bytes.wrap(buf.array());
+ }
+
+
+ public CombinedKey fromBytes(final Bytes data) {
+ //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
+ final byte[] dataArray = data.get();
+ final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
+ final int foreignKeyLength = dataBuffer.getInt();
+ final byte[] foreignKeyRaw = new byte[foreignKeyLength];
+ dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
+ final KO foreignKey = foreignKeyDeserializer.deserialize(serdeTopic, foreignKeyRaw);
+
+ final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES];
+ dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+ final K primaryKey = primaryKeyDeserializer.deserialize(serdeTopic, primaryKeyRaw);
+ return new CombinedKey<>(foreignKey, primaryKey);
+ }
+
+ Bytes prefixBytes(final KO key) {
+ //The serialization format. Note that primaryKeySerialized is not required/used in this function.
+ //{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
+
+ final byte[] foreignKeySerializedData = foreignKeySerializer.serialize(serdeTopic, key);
+
+ final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + foreignKeySerializedData.length);
+ buf.putInt(foreignKeySerializedData.length);
+ buf.put(foreignKeySerializedData);
+ return Bytes.wrap(buf.array());
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
new file mode 100644
index 00000000000..614b91f071e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+public class ForeignJoinSubscriptionProcessorSupplier implements ProcessorSupplier> {
+ private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
+ private final StoreBuilder>> storeBuilder;
+ private final CombinedKeySchema keySchema;
+
+ public ForeignJoinSubscriptionProcessorSupplier(
+ final StoreBuilder>> storeBuilder,
+ final CombinedKeySchema keySchema) {
+
+ this.storeBuilder = storeBuilder;
+ this.keySchema = keySchema;
+ }
+
+ @Override
+ public Processor> get() {
+ return new KTableKTableJoinProcessor();
+ }
+
+
+ private final class KTableKTableJoinProcessor extends AbstractProcessor> {
+ private Sensor skippedRecordsSensor;
+ private TimestampedKeyValueStore> store;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+ skippedRecordsSensor = ThreadMetrics.skipRecordSensor(internalProcessorContext.metrics());
+ store = internalProcessorContext.getStateStore(storeBuilder);
+ }
+
+ /**
+ * @throws StreamsException if key is null
+ */
+ @Override
+ public void process(final KO key, final Change value) {
+ // if the key is null, we do not need proceed aggregating
+ // the record with the table
+ if (key == null) {
+ LOG.warn(
+ "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ skippedRecordsSensor.record();
+ return;
+ }
+
+ final Bytes prefixBytes = keySchema.prefixBytes(key);
+
+ //Perform the prefixScan and propagate the results
+ try (final KeyValueIterator>> prefixScanResults =
+ store.range(prefixBytes, Bytes.increment(prefixBytes))) {
+
+ while (prefixScanResults.hasNext()) {
+ final KeyValue>> next = prefixScanResults.next();
+ // have to check the prefix because the range end is inclusive :(
+ if (prefixEquals(next.key.get(), prefixBytes.get())) {
+ final CombinedKey combinedKey = keySchema.fromBytes(next.key);
+ context().forward(
+ combinedKey.getPrimaryKey(),
+ new SubscriptionResponseWrapper<>(next.value.value().getHash(), value.newValue)
+ );
+ }
+ }
+ }
+ }
+
+ private boolean prefixEquals(final byte[] x, final byte[] y) {
+ final int min = Math.min(x.length, y.length);
+ final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
+ final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
+ return xSlice.equals(ySlice);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
new file mode 100644
index 00000000000..f122258d930
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.internals.Murmur3;
+
+import java.util.function.Function;
+import java.util.Arrays;
+
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
+import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
+
+public class ForeignJoinSubscriptionSendProcessorSupplier implements ProcessorSupplier> {
+
+ private final Function foreignKeyExtractor;
+ private final String repartitionTopicName;
+ private final Serializer valueSerializer;
+ private final boolean leftJoin;
+ private Serializer foreignKeySerializer;
+
+ public ForeignJoinSubscriptionSendProcessorSupplier(final Function foreignKeyExtractor,
+ final Serde foreignKeySerde,
+ final String repartitionTopicName,
+ final Serializer valueSerializer,
+ final boolean leftJoin) {
+ this.foreignKeyExtractor = foreignKeyExtractor;
+ this.valueSerializer = valueSerializer;
+ this.leftJoin = leftJoin;
+ this.repartitionTopicName = repartitionTopicName;
+ foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
+ }
+
+ @Override
+ public Processor> get() {
+ return new UnbindChangeProcessor();
+ }
+
+ private class UnbindChangeProcessor extends AbstractProcessor> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ // get default key serde if it wasn't supplied directly at construction
+ if (foreignKeySerializer == null) {
+ foreignKeySerializer = (Serializer) context.keySerde().serializer();
+ }
+ }
+
+ @Override
+ public void process(final K key, final Change change) {
+ final long[] currentHash = change.newValue == null ?
+ null :
+ Murmur3.hash128(valueSerializer.serialize(repartitionTopicName, change.newValue));
+
+ if (change.oldValue != null) {
+ final KO oldForeignKey = foreignKeyExtractor.apply(change.oldValue);
+ if (change.newValue != null) {
+ final KO newForeignKey = foreignKeyExtractor.apply(change.newValue);
+
+ final byte[] serialOldForeignKey = foreignKeySerializer.serialize(repartitionTopicName, oldForeignKey);
+ final byte[] serialNewForeignKey = foreignKeySerializer.serialize(repartitionTopicName, newForeignKey);
+ if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) {
+ //Different Foreign Key - delete the old key value and propagate the new one.
+ //Delete it from the oldKey's state store
+ context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, key));
+ //Add to the newKey's state store. Additionally, propagate null if no FK is found there,
+ //since we must "unset" any output set by the previous FK-join. This is true for both INNER
+ //and LEFT join.
+ }
+ context().forward(newForeignKey, new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, key));
+ } else {
+ //A simple propagatable delete. Delete from the state store and propagate the delete onwards.
+ context().forward(oldForeignKey, new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, key));
+ }
+ } else if (change.newValue != null) {
+ //change.oldValue is null, which means it was deleted at least once before, or it is brand new.
+ //In either case, we only need to propagate if the FK_VAL is available, as the null from the delete would
+ //have been propagated otherwise.
+
+ final SubscriptionWrapper.Instruction instruction;
+ if (leftJoin) {
+ //Want to send info even if RHS is null.
+ instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
+ } else {
+ instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
+ }
+ context().forward(foreignKeyExtractor.apply(change.newValue), new SubscriptionWrapper<>(currentHash, instruction, key));
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
new file mode 100644
index 00000000000..2544eb1856b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.util.Objects;
+
+/**
+ * Receives {@code SubscriptionWrapper} events and processes them according to their Instruction.
+ * Depending on the results, {@code SubscriptionResponseWrapper}s are created, which will be propagated to
+ * the {@code SubscriptionResolverJoinProcessorSupplier} instance.
+ *
+ * @param Type of primary keys
+ * @param Type of foreign key
+ * @param Type of foreign value
+ */
+public class SubscriptionJoinForeignProcessorSupplier
+ implements ProcessorSupplier, Change>>> {
+
+ private final KTableValueGetterSupplier foreignValueGetterSupplier;
+
+ public SubscriptionJoinForeignProcessorSupplier(final KTableValueGetterSupplier foreignValueGetterSupplier) {
+ this.foreignValueGetterSupplier = foreignValueGetterSupplier;
+ }
+
+ @Override
+ public Processor, Change>>> get() {
+
+ return new AbstractProcessor, Change>>>() {
+
+ private KTableValueGetter foreignValues;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ foreignValues = foreignValueGetterSupplier.get();
+ foreignValues.init(context);
+ }
+
+ @Override
+ public void process(final CombinedKey combinedKey, final Change>> change) {
+ Objects.requireNonNull(combinedKey, "This processor should never see a null key.");
+ Objects.requireNonNull(change, "This processor should never see a null value.");
+ final ValueAndTimestamp> valueAndTimestamp = change.newValue;
+ Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
+ final SubscriptionWrapper value = valueAndTimestamp.value();
+
+ if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+ //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
+ //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
+ //from older SubscriptionWrapper versions to newer versions.
+ throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
+ }
+
+ final ValueAndTimestamp foreignValueAndTime = foreignValues.get(combinedKey.getForeignKey());
+
+ final long resultTimestamp =
+ foreignValueAndTime == null ?
+ valueAndTimestamp.timestamp() :
+ Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
+
+ switch (value.getInstruction()) {
+ case DELETE_KEY_AND_PROPAGATE:
+ context().forward(
+ combinedKey.getPrimaryKey(),
+ new SubscriptionResponseWrapper(value.getHash(), null),
+ To.all().withTimestamp(resultTimestamp)
+ );
+ break;
+ case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE:
+ //This one needs to go through regardless of LEFT or INNER join, since the extracted FK was
+ //changed and there is no match for it. We must propagate the (key, null) to ensure that the
+ //downstream consumers are alerted to this fact.
+ final VO valueToSend = foreignValueAndTime == null ? null : foreignValueAndTime.value();
+
+ context().forward(
+ combinedKey.getPrimaryKey(),
+ new SubscriptionResponseWrapper<>(value.getHash(), valueToSend),
+ To.all().withTimestamp(resultTimestamp)
+ );
+ break;
+ case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE:
+ if (foreignValueAndTime != null) {
+ context().forward(
+ combinedKey.getPrimaryKey(),
+ new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()),
+ To.all().withTimestamp(resultTimestamp)
+ );
+ }
+ break;
+ case DELETE_KEY_NO_PROPAGATE:
+ break;
+ default:
+ throw new IllegalStateException("Unhandled instruction: " + value.getInstruction());
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
new file mode 100644
index 00000000000..a188f15062c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.Murmur3;
+
+/**
+ * Receives {@code SubscriptionResponseWrapper} events and filters out events which do not match the current hash
+ * of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
+ * Applies the join and emits nulls according to LEFT/INNER rules.
+ *
+ * @param Type of primary keys
+ * @param Type of primary values
+ * @param Type of foreign values
+ * @param Type of joined result of primary and foreign values
+ */
+public class SubscriptionResolverJoinProcessorSupplier implements ProcessorSupplier> {
+ private final KTableValueGetterSupplier valueGetterSupplier;
+ private final Serializer valueSerializer;
+ private final ValueJoiner joiner;
+ private final boolean leftJoin;
+
+ public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier valueGetterSupplier,
+ final Serializer valueSerializer,
+ final ValueJoiner joiner,
+ final boolean leftJoin) {
+ this.valueGetterSupplier = valueGetterSupplier;
+ this.valueSerializer = valueSerializer;
+ this.joiner = joiner;
+ this.leftJoin = leftJoin;
+ }
+
+ @Override
+ public Processor> get() {
+ return new AbstractProcessor>() {
+
+ private KTableValueGetter valueGetter;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ valueGetter = valueGetterSupplier.get();
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(final K key, final SubscriptionResponseWrapper value) {
+ if (value.getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) {
+ //Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
+ //compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
+ //upgrading from older SubscriptionWrapper versions to newer versions.
+ throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version.");
+ }
+ final ValueAndTimestamp currentValueWithTimestamp = valueGetter.get(key);
+
+ //We are unable to access the actual source topic name for the valueSerializer at runtime, without
+ //tightly coupling to KTableRepartitionProcessorSupplier.
+ //While we can use the source topic from where the events came from, we shouldn't serialize against it
+ //as it causes problems with the confluent schema registry, which requires each topic have only a single
+ //registered schema.
+ final String dummySerializationTopic = context().topic() + "-join-resolver";
+ final long[] currentHash = currentValueWithTimestamp == null ?
+ null :
+ Murmur3.hash128(valueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+
+ final long[] messageHash = value.getOriginalValueHash();
+
+ //If this value doesn't match the current value from the original table, it is stale and should be discarded.
+ if (java.util.Arrays.equals(messageHash, currentHash)) {
+ final VR result;
+
+ if (value.getForeignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
+ result = null; //Emit tombstone
+ } else {
+ result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), value.getForeignValue());
+ }
+ context().forward(key, result);
+ }
+ }
+ };
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
new file mode 100644
index 00000000000..9c79e468213
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+
+import java.util.Arrays;
+
+public class SubscriptionResponseWrapper {
+ final static byte CURRENT_VERSION = 0x00;
+ private final long[] originalValueHash;
+ private final FV foreignValue;
+ private final byte version;
+
+ public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue) {
+ this(originalValueHash, foreignValue, CURRENT_VERSION);
+ }
+
+ public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final byte version) {
+ if (version != CURRENT_VERSION) {
+ throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
+ }
+ this.originalValueHash = originalValueHash;
+ this.foreignValue = foreignValue;
+ this.version = version;
+ }
+
+ public long[] getOriginalValueHash() {
+ return originalValueHash;
+ }
+
+ public FV getForeignValue() {
+ return foreignValue;
+ }
+
+ public byte getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return "SubscriptionResponseWrapper{" +
+ "version=" + version +
+ ", foreignValue=" + foreignValue +
+ ", originalValueHash=" + Arrays.toString(originalValueHash) +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
new file mode 100644
index 00000000000..6524b4fc764
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+
+public class SubscriptionResponseWrapperSerde implements Serde> {
+ private final SubscriptionResponseWrapperSerializer serializer;
+ private final SubscriptionResponseWrapperDeserializer deserializer;
+
+ public SubscriptionResponseWrapperSerde(final Serde foreignValueSerde) {
+ serializer = new SubscriptionResponseWrapperSerializer<>(foreignValueSerde.serializer());
+ deserializer = new SubscriptionResponseWrapperDeserializer<>(foreignValueSerde.deserializer());
+ }
+
+ @Override
+ public Serializer> serializer() {
+ return serializer;
+ }
+
+ @Override
+ public Deserializer> deserializer() {
+ return deserializer;
+ }
+
+ private static final class SubscriptionResponseWrapperSerializer implements Serializer> {
+ private final Serializer serializer;
+
+ private SubscriptionResponseWrapperSerializer(final Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final SubscriptionResponseWrapper data) {
+ //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
+
+ //7-bit (0x7F) maximum for data version.
+ if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+ throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
+ }
+
+ final byte[] serializedData = serializer.serialize(topic, data.getForeignValue());
+ final int serializedDataLength = serializedData == null ? 0 : serializedData.length;
+ final long[] originalHash = data.getOriginalValueHash();
+ final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+
+ final ByteBuffer buf = ByteBuffer.allocate(1 + hashLength + serializedDataLength);
+
+ if (originalHash != null) {
+ buf.put(data.getVersion());
+ buf.putLong(originalHash[0]);
+ buf.putLong(originalHash[1]);
+ } else {
+ //Don't store hash as it's null.
+ buf.put((byte) (data.getVersion() | (byte) 0x80));
+ }
+
+ if (serializedData != null)
+ buf.put(serializedData);
+ return buf.array();
+ }
+
+ }
+
+ private static final class SubscriptionResponseWrapperDeserializer implements Deserializer> {
+ private final Deserializer deserializer;
+
+ private SubscriptionResponseWrapperDeserializer(final Deserializer deserializer) {
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public SubscriptionResponseWrapper deserialize(final String topic, final byte[] data) {
+ //{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
+
+ final ByteBuffer buf = ByteBuffer.wrap(data);
+ final byte versionAndIsHashNull = buf.get();
+ final byte version = (byte) (0x7F & versionAndIsHashNull);
+ final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
+
+ final long[] hash;
+ int lengthSum = 1; //The first byte
+ if (isHashNull) {
+ hash = null;
+ } else {
+ hash = new long[2];
+ hash[0] = buf.getLong();
+ hash[1] = buf.getLong();
+ lengthSum += 2 * Long.BYTES;
+ }
+
+ final byte[] serializedValue;
+ if (data.length - lengthSum > 0) {
+ serializedValue = new byte[data.length - lengthSum];
+ buf.get(serializedValue, 0, serializedValue.length);
+ } else
+ serializedValue = null;
+
+ final V value = deserializer.deserialize(topic, serializedValue);
+ return new SubscriptionResponseWrapper<>(hash, value, version);
+ }
+
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
new file mode 100644
index 00000000000..3d5f5160197
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionStoreReceiveProcessorSupplier
+ implements ProcessorSupplier> {
+ private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
+
+ private final StoreBuilder>> storeBuilder;
+ private final CombinedKeySchema keySchema;
+
+ public SubscriptionStoreReceiveProcessorSupplier(
+ final StoreBuilder>> storeBuilder,
+ final CombinedKeySchema keySchema) {
+
+ this.storeBuilder = storeBuilder;
+ this.keySchema = keySchema;
+ }
+
+ @Override
+ public Processor> get() {
+
+ return new AbstractProcessor>() {
+
+ private TimestampedKeyValueStore> store;
+ private StreamsMetricsImpl metrics;
+ private Sensor skippedRecordsSensor;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+
+ metrics = internalProcessorContext.metrics();
+ skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
+ store = internalProcessorContext.getStateStore(storeBuilder);
+ }
+
+ @Override
+ public void process(final KO key, final SubscriptionWrapper value) {
+ if (key == null) {
+ LOG.warn(
+ "Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(), context().offset()
+ );
+ skippedRecordsSensor.record();
+ return;
+ }
+ if (value.getVersion() != SubscriptionWrapper.CURRENT_VERSION) {
+ //Guard against modifications to SubscriptionWrapper. Need to ensure that there is compatibility
+ //with previous versions to enable rolling upgrades. Must develop a strategy for upgrading
+ //from older SubscriptionWrapper versions to newer versions.
+ throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
+ }
+
+ final Bytes subscriptionKey = keySchema.toBytes(key, value.getPrimaryKey());
+
+ final ValueAndTimestamp> newValue = ValueAndTimestamp.make(value, context().timestamp());
+ final ValueAndTimestamp> oldValue = store.get(subscriptionKey);
+
+ //If the subscriptionWrapper hash indicates a null, must delete from statestore.
+ //This store is used by the prefix scanner in ForeignJoinSubscriptionProcessorSupplier
+ if (value.getHash() == null) {
+ store.delete(subscriptionKey);
+ } else {
+ store.put(subscriptionKey, newValue);
+ }
+ final Change>> change = new Change<>(newValue, oldValue);
+ // note: key is non-nullable
+ // note: newValue is non-nullable
+ context().forward(
+ new CombinedKey<>(key, value.getPrimaryKey()),
+ change,
+ To.all().withTimestamp(newValue.timestamp())
+ );
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
new file mode 100644
index 00000000000..a757895aecf
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+
+public class SubscriptionWrapper {
+ static final byte CURRENT_VERSION = 0;
+
+ private final long[] hash;
+ private final Instruction instruction;
+ private final byte version;
+ private final K primaryKey;
+
+ public enum Instruction {
+ //Send nothing. Do not propagate.
+ DELETE_KEY_NO_PROPAGATE((byte) 0x00),
+
+ //Send (k, null)
+ DELETE_KEY_AND_PROPAGATE((byte) 0x01),
+
+ //(changing foreign key, but FK+Val may not exist)
+ //Send (k, fk-val) OR
+ //Send (k, null) if fk-val does not exist
+ PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE((byte) 0x02),
+
+ //(first time ever sending key)
+ //Send (k, fk-val) only if fk-val exists.
+ PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE((byte) 0x03);
+
+ private final byte value;
+ Instruction(final byte value) {
+ this.value = value;
+ }
+
+ public byte getValue() {
+ return value;
+ }
+
+ public static Instruction fromValue(final byte value) {
+ for (final Instruction i: values()) {
+ if (i.value == value) {
+ return i;
+ }
+ }
+ throw new IllegalArgumentException("Unknown instruction byte value = " + value);
+ }
+ }
+
+ public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey) {
+ this(hash, instruction, primaryKey, CURRENT_VERSION);
+ }
+
+ public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version) {
+ Objects.requireNonNull(instruction, "instruction cannot be null. Required by downstream processor.");
+ Objects.requireNonNull(primaryKey, "primaryKey cannot be null. Required by downstream processor.");
+ if (version != CURRENT_VERSION) {
+ throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
+ }
+
+ this.instruction = instruction;
+ this.hash = hash;
+ this.primaryKey = primaryKey;
+ this.version = version;
+ }
+
+ public Instruction getInstruction() {
+ return instruction;
+ }
+
+ public long[] getHash() {
+ return hash;
+ }
+
+ public K getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public byte getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return "SubscriptionWrapper{" +
+ "version=" + version +
+ ", primaryKey=" + primaryKey +
+ ", instruction=" + instruction +
+ ", hash=" + Arrays.toString(hash) +
+ '}';
+ }
+}
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
new file mode 100644
index 00000000000..ae53ba8b349
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+
+public class SubscriptionWrapperSerde implements Serde> {
+ private final SubscriptionWrapperSerializer serializer;
+ private final SubscriptionWrapperDeserializer deserializer;
+
+ public SubscriptionWrapperSerde(final Serde primaryKeySerde) {
+ serializer = new SubscriptionWrapperSerializer<>(primaryKeySerde.serializer());
+ deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerde.deserializer());
+ }
+
+ @Override
+ public Serializer> serializer() {
+ return serializer;
+ }
+
+ @Override
+ public Deserializer> deserializer() {
+ return deserializer;
+ }
+
+ private static class SubscriptionWrapperSerializer implements Serializer> {
+ private final Serializer primaryKeySerializer;
+ SubscriptionWrapperSerializer(final Serializer primaryKeySerializer) {
+ this.primaryKeySerializer = primaryKeySerializer;
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final SubscriptionWrapper data) {
+ //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
+
+ //7-bit (0x7F) maximum for data version.
+ if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
+ throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
+ }
+
+ final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(topic, data.getPrimaryKey());
+
+ final ByteBuffer buf;
+ if (data.getHash() != null) {
+ buf = ByteBuffer.allocate(2 + 2 * Long.BYTES + primaryKeySerializedData.length);
+ buf.put(data.getVersion());
+ } else {
+ //Don't store hash as it's null.
+ buf = ByteBuffer.allocate(2 + primaryKeySerializedData.length);
+ buf.put((byte) (data.getVersion() | (byte) 0x80));
+ }
+
+ buf.put(data.getInstruction().getValue());
+ final long[] elem = data.getHash();
+ if (data.getHash() != null) {
+ buf.putLong(elem[0]);
+ buf.putLong(elem[1]);
+ }
+ buf.put(primaryKeySerializedData);
+ return buf.array();
+ }
+
+ }
+
+ private static class SubscriptionWrapperDeserializer implements Deserializer> {
+ private final Deserializer primaryKeyDeserializer;
+ SubscriptionWrapperDeserializer(final Deserializer primaryKeyDeserializer) {
+ this.primaryKeyDeserializer = primaryKeyDeserializer;
+ }
+
+ @Override
+ public SubscriptionWrapper deserialize(final String topic, final byte[] data) {
+ //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
+ final ByteBuffer buf = ByteBuffer.wrap(data);
+ final byte versionAndIsHashNull = buf.get();
+ final byte version = (byte) (0x7F & versionAndIsHashNull);
+ final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
+ final SubscriptionWrapper.Instruction inst = SubscriptionWrapper.Instruction.fromValue(buf.get());
+
+ final long[] hash;
+ int lengthSum = 2; //The first 2 bytes
+ if (isHashNull) {
+ hash = null;
+ } else {
+ hash = new long[2];
+ hash[0] = buf.getLong();
+ hash[1] = buf.getLong();
+ lengthSum += 2 * Long.BYTES;
+ }
+
+ final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
+ buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+ final K primaryKey = primaryKeyDeserializer.deserialize(topic, primaryKeyRaw);
+
+ return new SubscriptionWrapper<>(hash, inst, primaryKey, version);
+ }
+
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 460f640b5c3..2cc153975b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
public abstract class BaseRepartitionNode extends StreamsGraphNode {
@@ -29,7 +30,7 @@ public abstract class BaseRepartitionNode extends StreamsGraphNode {
protected final String sourceName;
protected final String repartitionTopic;
protected final ProcessorParameters processorParameters;
-
+ protected final StreamPartitioner partitioner;
BaseRepartitionNode(final String nodeName,
final String sourceName,
@@ -37,7 +38,8 @@ public abstract class BaseRepartitionNode extends StreamsGraphNode {
final Serde keySerde,
final Serde valueSerde,
final String sinkName,
- final String repartitionTopic) {
+ final String repartitionTopic,
+ final StreamPartitioner partitioner) {
super(nodeName);
@@ -47,6 +49,7 @@ public abstract class BaseRepartitionNode extends StreamsGraphNode {
this.sourceName = sourceName;
this.repartitionTopic = repartitionTopic;
this.processorParameters = processorParameters;
+ this.partitioner = partitioner;
}
abstract Serializer getValueSerializer();
@@ -61,7 +64,8 @@ public abstract class BaseRepartitionNode extends StreamsGraphNode {
", sinkName='" + sinkName + '\'' +
", sourceName='" + sourceName + '\'' +
", repartitionTopic='" + repartitionTopic + '\'' +
- ", processorParameters=" + processorParameters +
+ ", processorParameters=" + processorParameters + '\'' +
+ ", partitioner=" + partitioner +
"} " + super.toString();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index 4d1b67dbc33..a3f79c50ea9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -43,7 +43,8 @@ public class GroupedTableOperationRepartitionNode extends BaseRepartitionN
keySerde,
valueSerde,
sinkName,
- repartitionTopic
+ repartitionTopic,
+ null
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java
new file mode 100644
index 00000000000..672625cfc1c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+/**
+ * Too much specific information to generalize so the Foreign Key KTable-KTable join requires a specific node.
+ */
+public class KTableKTableForeignKeyJoinResolutionNode extends StreamsGraphNode {
+ private final ProcessorParameters> joinOneToOneProcessorParameters;
+ private final ProcessorParameters> joinByPrefixProcessorParameters;
+ private final ProcessorParameters> resolverProcessorParameters;
+ private final String finalRepartitionTopicName;
+ private final String finalRepartitionSinkName;
+ private final String finalRepartitionSourceName;
+ private final Serde keySerde;
+ private final Serde> subResponseSerde;
+ private final KTableValueGetterSupplier originalValueGetter;
+
+ public KTableKTableForeignKeyJoinResolutionNode(final String nodeName,
+ final ProcessorParameters> joinOneToOneProcessorParameters,
+ final ProcessorParameters