diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 256eddd6f74..fa289ca5959 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -891,7 +891,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest { rejoin, leftVersioned, rightVersioned, - value -> value.split("\\|")[1] + value -> { + final String[] tokens = value.split("\\|"); + return tokens.length == 2 ? tokens[1] : null; + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index e654cd752af..3bb3c7a9396 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -82,7 +82,8 @@ public class SubscriptionReceiveProcessorSupplier @Override public void process(final Record> record) { - if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) { + final KRight foreignKey = record.key(); + if (foreignKey == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) { dropRecord(); return; } @@ -93,7 +94,7 @@ public class SubscriptionReceiveProcessorSupplier throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version."); } context().forward( - record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey())) + record.withKey(new CombinedKey<>(foreignKey, record.value().primaryKey())) .withValue(inferChange(record)) .withTimestamp(record.timestamp()) ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index b161ce092c4..12692048267 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -128,47 +128,101 @@ public class SubscriptionSendProcessorSupplier } private void leftJoinInstructions(final Record> record) { - if (record.value().oldValue != null) { - final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); - final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); - if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + final VLeft oldValue = record.value().oldValue; + final VLeft newValue = record.value().newValue; + + if (oldValue == null && newValue == null) { + // no output for idempotent left hand side deletes + return; + } + + final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue); + final KRight newForeignKey = newValue == null ? null : foreignKeyExtractor.extract(record.key(), newValue); + + final boolean unsubscribe = oldForeignKey != null; + if (unsubscribe) { + // delete old subscription if FK changed + // + // if FK did change, we need to explicitly delete the old subscription, + // because the new subscription goes to a different partition + if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + // this may lead to unnecessary tombstones, if we delete an existing key, + // which did not join previously; + // however, we cannot avoid it as we have no means to know if the old FK joined or not forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); - } else if (record.value().newValue != null) { - final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } - } + + // for all cases, insert, update, and delete, we send a new subscription + // we need to get a response back for all cases to always produce a left-join result + // + // note: for delete, `newForeignKey` is null, what is a "hack" + // no actual subscription will be added for null-FK, but we still get the response back we need + // + // this may lead to unnecessary tombstones, if we delete an existing key, + // which did not join previously; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); +} private void defaultJoinInstructions(final Record> record) { - if (record.value().oldValue != null) { - final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); - final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); + final VLeft oldValue = record.value().oldValue; + final VLeft newValue = record.value().newValue; + + final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue); + final boolean unsubscribe = oldForeignKey != null; + + // if left row is inserted or updated, subscribe to new FK (if new FK is valid) + if (newValue != null) { + final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue); - if (oldForeignKey == null && newForeignKey == null) { - logSkippedRecordDueToNullForeignKey(); - } else if (oldForeignKey == null) { - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); - } else if (newForeignKey == null) { - forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); - } else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { - //Different Foreign Key - delete the old key value and propagate the new one. - //Delete it from the oldKey's state store - forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); - //Add to the newKey's state store. Additionally, propagate null if no FK is found there, - //since we must "unset" any output set by the previous FK-join. This is true for both INNER - //and LEFT join. - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); - } else { // unchanged FK - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); - } - } else if (record.value().newValue != null) { - final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); + if (unsubscribe) { + // delete old subscription + // + // this may lead to unnecessary tombstones if the old FK did not join + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); + } } else { - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + // regular insert/update + + if (unsubscribe) { + // update case + + // delete old subscription if FK changed + // if FK did change, we need to explicitly delete the old subscription, + // because the new subscription goes to a different partition + // + // we don't need any response, as we only want a response from the new subscription + if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); + } + + // subscribe to new key (note, could be on a different task/node than old key) + // additionally, propagate null if no FK is found so we can delete the previous result (if any) + // + // this may lead to unnecessary tombstones if the old FK did not join + // and the new FK key does not join either; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); + } else { + // insert + + // subscribe to new key + // don't propagate null if no FK is found; + // for inserts, we know that there is no need to delete any previous result + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + } + } + } else { + // left row is deleted + if (unsubscribe) { + // this may lead to unnecessary tombstones, if we delete an existing key, + // which did not join previously; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java index d49df2f5cfd..552ffb188c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java @@ -284,6 +284,23 @@ public class SubscriptionSendProcessorSupplierTest { ); } + @Test + public void innerJoinShouldNotDeleteOldAndPropagateNewFKForUnchangedFK() { + final MockInternalProcessorContext> context = new MockInternalProcessorContext<>(); + innerJoinProcessor.init(context); + context.setRecordMetadata("topic", 0, 0); + + final LeftValue leftRecordValue = new LeftValue(fk1); + + innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0)); + + assertThat(context.forwarded().size(), is(1)); + assertThat( + context.forwarded().get(0).record(), + is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0)) + ); + } + @Test public void innerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() { final MockInternalProcessorContext> context = new MockInternalProcessorContext<>(); @@ -332,7 +349,7 @@ public class SubscriptionSendProcessorSupplierTest { } @Test - public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() { + public void innerJoinShouldPropagateNewRecordOfUnchangedFK() { final MockInternalProcessorContext> context = new MockInternalProcessorContext<>(); innerJoinProcessor.init(context); context.setRecordMetadata("topic", 0, 0); @@ -344,7 +361,7 @@ public class SubscriptionSendProcessorSupplierTest { assertThat(context.forwarded().size(), is(1)); assertThat( context.forwarded().get(0).record(), - is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0)) + is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0)) ); }