KAFKA-18713: Fix FK Left-Join result race condition (#19005)

When a row in a FK-join left table is updated, we should send a "delete
subscription with no response" for the old FK to the right hand side, to
avoid getting two responses from the right hand side. Only the "new
subscription" for the new FK should request a response. If two responses
are requested, there is a race condition for which both responses could
be processed in the wrong order, leading to an incorrect join result.

This PR fixes the "delete subscription" case accordingly, to no request
a response.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
nilmadhab mondal 2025-04-04 01:22:47 +02:00 committed by GitHub
parent 5eb4e116bc
commit d35ab4d27a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 114 additions and 31 deletions

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
@ -60,10 +61,13 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -183,13 +187,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this unreferenced FK won't show up in any results
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
outputTopic.readKeyValuesToList(),
is(emptyList())
);
if (rejoin) {
assertThat(
rejoinOutputTopic.readKeyValuesToMap(),
is(emptyMap())
rejoinOutputTopic.readKeyValuesToList(),
is(emptyList())
);
}
if (materialized) {
@ -203,27 +207,27 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
final List<KeyValue<String, String>> expected = Arrays.asList(
KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
outputTopic.readKeyValuesToList(),
is(expected)
);
if (rejoin) {
assertThat(
rejoinOutputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
mkEntry("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
rejoinOutputTopic.readKeyValuesToList(),
is(asList(
KeyValue.pair("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
KeyValue.pair("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
))
);
}
if (materialized) {
assertThat(
asMap(store),
is(expected)
is(expected.stream().collect(Collectors.toMap(kv -> kv.key, kv -> kv.value)))
);
}
}
@ -232,16 +236,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
outputTopic.readKeyValuesToList(),
is(List.of(
new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
))
);
if (rejoin) {
assertThat(
rejoinOutputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
rejoinOutputTopic.readKeyValuesToList(),
is(List.of(
new KeyValue<>("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
))
);
}
@ -256,21 +260,21 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
);
}
}
// Now delete one LHS entity such that one delete is propagated down to the output.
left.pipeInput("lhs1", null, baseTimestamp + 6);
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", null)
outputTopic.readKeyValuesToList(),
is(List.of(
new KeyValue<>("lhs1", null)
))
);
if (rejoin) {
assertThat(
rejoinOutputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", null)
))
rejoinOutputTopic.readKeyValuesToList(),
hasItem(
KeyValue.pair("lhs1", null))
);
}
if (materialized) {
@ -285,6 +289,79 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
@ParameterizedTest
@MethodSource("testCases")
public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean leftJoin,
final String optimization,
final boolean materialized,
final boolean rejoin,
final boolean leftVersioned,
final boolean rightVersioned) {
final Properties streamsConfig = getStreamsProperties(optimization);
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");
// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
assertThat(
outputTopic.readKeyValuesToList(),
is(emptyList())
);
if (rejoin) {
assertThat(
rejoinOutputTopic.readKeyValuesToList(),
is(emptyList())
);
}
if (materialized) {
assertThat(
asMap(store),
is(emptyMap())
);
}
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
{
final List<KeyValue<String, String>> expected = asList(
KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
);
assertThat(
outputTopic.readKeyValuesToList(),
is(expected)
);
}
// Add another reference to an existing FK
left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
{
assertThat(
outputTopic.readKeyValuesToList(),
is(List.of(
new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
))
);
}
// Now revert back the foreign key to earlier reference
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
assertThat(
outputTopic.readKeyValuesToList(),
is(List.of(
new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
))
);
}
}
@ParameterizedTest
@MethodSource("testCases")
public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@ -795,6 +872,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
return result;
}
protected static List<KeyValue<String, String>> makeList(final KeyValueStore<String, ValueAndTimestamp<String>> store) {
final List<KeyValue<String, String>> result = new LinkedList<>();
store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key, ele.value.value())));
return result;
}
protected static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName,
final boolean leftJoin,

View File

@ -132,7 +132,7 @@ public class SubscriptionSendProcessorSupplier<KLeft, VLeft, KRight>
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {

View File

@ -149,7 +149,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}
@ -198,7 +198,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(null, DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}
@ -438,7 +438,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}
@ -491,7 +491,7 @@ public class SubscriptionSendProcessorSupplierTest {
assertThat(context.forwarded().size(), greaterThan(0));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
);
}