MINOR: simplify FK-join logic

The existing FK join logic is very convoluted due to incremtal changes and bug-fixes,
and thus very hard to understand.

This PR rewrite the logic from scratch to make it easier to understanding,
and fixes a minor bug on the side.
This commit is contained in:
Matthias J. Sax 2025-09-28 16:26:41 -07:00
parent c2aeec46a2
commit 0feaa1771c
4 changed files with 112 additions and 37 deletions

View File

@ -891,7 +891,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
rejoin,
leftVersioned,
rightVersioned,
value -> value.split("\\|")[1]
value -> {
final String[] tokens = value.split("\\|");
return tokens.length == 2 ? tokens[1] : null;
}
);
}

View File

@ -82,7 +82,8 @@ public class SubscriptionReceiveProcessorSupplier<KLeft, KRight>
@Override
public void process(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
final KRight foreignKey = record.key();
if (foreignKey == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
dropRecord();
return;
}
@ -93,7 +94,7 @@ public class SubscriptionReceiveProcessorSupplier<KLeft, KRight>
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
context().forward(
record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey()))
record.withKey(new CombinedKey<>(foreignKey, record.value().primaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);

View File

@ -128,49 +128,103 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
}
private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;
if (oldValue == null && newValue == null) {
// no output for idempotent left hand side deletes
return;
}
final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final KRight newForeignKey = newValue == null ? null : foreignKeyExtractor.extract(record.key(), newValue);
final boolean unsubscribe = oldForeignKey != null;
if (unsubscribe) {
// delete old subscription if FK changed
//
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
}
// for all cases, insert, update, and delete, we send a new subscription
// we need to get a response back for all cases to always produce a left-join result
//
// note: for delete, `newForeignKey` is null, what is a "hack"
// no actual subscription will be added for null-FK, but we still get the response back we need
//
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;
final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final boolean unsubscribe = oldForeignKey != null;
// if left row is inserted or updated, subscribe to new FK (if new FK is valid)
if (newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue);
if (oldForeignKey == null && newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
} else if (oldForeignKey == null) {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
} else if (newForeignKey == null) {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
} else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else { // unchanged FK
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
if (unsubscribe) {
// delete old subscription
//
// this may lead to unnecessary tombstones if the old FK did not join
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
} else {
// regular insert/update
if (unsubscribe) {
// update case
// delete old subscription if FK changed
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
//
// we don't need any response, as we only want a response from the new subscription
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
// subscribe to new key (note, could be on a different task/node than old key)
// additionally, propagate null if no FK is found so we can delete the previous result (if any)
//
// this may lead to unnecessary tombstones if the old FK did not join
// and the new FK key does not join either;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
// insert
// subscribe to new key
// don't propagate null if no FK is found;
// for inserts, we know that there is no need to delete any previous result
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
}
} else {
// left row is deleted
if (unsubscribe) {
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
}
}
private byte[] serialize(final KRight key) {

View File

@ -284,6 +284,23 @@ public class SubscriptionSendProcessorSupplierTest {
);
}
@Test
public void innerJoinShouldNotDeleteOldAndPropagateNewFKForUnchangedFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
final LeftValue leftRecordValue = new LeftValue(fk1);
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
}
@Test
public void innerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
@ -332,7 +349,7 @@ public class SubscriptionSendProcessorSupplierTest {
}
@Test
public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
public void innerJoinShouldPropagateNewRecordOfUnchangedFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
@ -344,7 +361,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
}