This commit is contained in:
Matthias J. Sax 2025-09-28 19:15:12 -07:00
parent 0feaa1771c
commit 4b7c30219b
2 changed files with 27 additions and 21 deletions

View File

@ -163,7 +163,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
}
private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
final VLeft oldValue = record.value().oldValue;
@ -176,7 +176,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
if (newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue);
if (newForeignKey == null) {
if (newForeignKey == null) { // invalid FK
logSkippedRecordDueToNullForeignKey();
if (unsubscribe) {
// delete old subscription
@ -185,30 +185,36 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
} else {
} else { // valid FK
// regular insert/update
if (unsubscribe) {
// update case
// delete old subscription if FK changed
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
//
// we don't need any response, as we only want a response from the new subscription
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
final boolean foreignKeyChanged = !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
// subscribe to new key (note, could be on a different task/node than old key)
// additionally, propagate null if no FK is found so we can delete the previous result (if any)
//
// this may lead to unnecessary tombstones if the old FK did not join
// and the new FK key does not join either;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
if (foreignKeyChanged) {
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
//
// we don't need any response, as we only want a response from the new subscription
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
// subscribe to new key (note, could be on a different task/node than the old FK)
// additionally, propagate null if no FK is found so we can delete the previous result (if any)
//
// this may lead to unnecessary tombstones if the old FK did not join
// and the new FK key does not join either;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
// if FK did not change, we only need a response from the new FK subscription, if there is a join
// if there is no join, we know that the old row did not join either (as it used the same FK)
// and thus we don't need to propagate an idempotent null result
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else {
// insert
// insert case
// subscribe to new key
// don't propagate null if no FK is found;

View File

@ -297,7 +297,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
}
@ -361,7 +361,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
);
}