KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value (#19303)

Fixes both KAFKA-16407 and KAFKA-16434.

Summary of existing issues:

- We are ignoring new left record when its previous FK value is null
- We do not unset foreign key join result when FK becomes null

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Ayoub Omari 2025-04-06 05:13:31 +02:00 committed by GitHub
parent 3f8e86ab67
commit b963e58000
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 20 deletions

View File

@ -143,28 +143,25 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
if (record.value().oldValue != null) {
final KRight oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
if (oldForeignKey == null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey == null && newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
return;
}
if (record.value().newValue != null) {
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
return;
}
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
} else if (oldForeignKey == null) {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
} else if (newForeignKey == null) {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
} else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
} else { // unchanged FK
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);

View File

@ -295,10 +295,57 @@ public class SubscriptionSendProcessorSupplierTest {
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
assertThat(context.forwarded(), empty());
}
// test dropped-records sensors
assertEquals(1.0, getDroppedRecordsTotalMetric(context));
assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
@Test
public void innerJoinShouldPropagateChangeFromNullFKToNonNullFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
final LeftValue leftRecordValue = new LeftValue(fk1);
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(null)), 0));
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
}
@Test
public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
final LeftValue leftRecordValue = new LeftValue(null);
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0));
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
);
}
@Test
public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
final LeftValue leftRecordValue = new LeftValue(fk1);
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
}
@Test
@ -316,6 +363,17 @@ public class SubscriptionSendProcessorSupplierTest {
);
}
@Test
public void innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(null)), 0));
assertThat(context.forwarded(), empty());
}
@Test
public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();