KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables (#13522)

This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-12 22:05:10 -04:00 committed by GitHub
parent 951894d2ff
commit 1d5d003ff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 535 additions and 148 deletions

View File

@ -230,19 +230,19 @@
files=".*[/\\]streams[/\\].*test[/\\].*.java"/> files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
<suppress checks="CyclomaticComplexity" <suppress checks="CyclomaticComplexity"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/> files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
<suppress checks="NPathComplexity" <suppress checks="NPathComplexity"
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/> files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|KTableKTableForeignKeyVersionedJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest).java"/>
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)" <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/> files="Murmur3Test.java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest).java"/> files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KTableKTableForeignKeyVersionedJoinIntegrationTest).java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="StreamTaskTest.java"/> files="StreamTaskTest.java"/>

View File

@ -1116,7 +1116,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
keySerde keySerde
); );
final ProcessorGraphNode<K, Change<V>> subscriptionNode = new ProcessorGraphNode<>( final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, Change<V>> subscriptionNode = new StatefulProcessorNode<>(
new ProcessorParameters<>( new ProcessorParameters<>(
new ForeignJoinSubscriptionSendProcessorSupplier<>( new ForeignJoinSubscriptionSendProcessorSupplier<>(
foreignKeyExtractor, foreignKeyExtractor,
@ -1124,10 +1125,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
valueHashSerdePseudoTopic, valueHashSerdePseudoTopic,
foreignKeySerde, foreignKeySerde,
valueSerde == null ? null : valueSerde.serializer(), valueSerde == null ? null : valueSerde.serializer(),
leftJoin leftJoin,
primaryKeyValueGetter
), ),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION) renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
) ),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
); );
builder.addGraphNode(graphNode, subscriptionNode); builder.addGraphNode(graphNode, subscriptionNode);
@ -1179,26 +1183,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
); );
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode = final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinForeignNode =
new StatefulProcessorNode<>( new StatefulProcessorNode<>(
new ProcessorParameters<>( new ProcessorParameters<>(
new SubscriptionJoinForeignProcessorSupplier<>( new SubscriptionJoinForeignProcessorSupplier<>(
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier() foreignKeyValueGetter
), ),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR) renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
), ),
Collections.emptySet(), Collections.emptySet(),
Collections.singleton(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier()) Collections.singleton(foreignKeyValueGetter)
); );
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode); builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinForeignNode);
final StatefulProcessorNode<KO, Change<Object>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>( final StatefulProcessorNode<KO, Change<VO>> foreignJoinSubscriptionNode = new StatefulProcessorNode<>(
new ProcessorParameters<>( new ProcessorParameters<>(
new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema), new ForeignJoinSubscriptionProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR) renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
), ),
Collections.singleton(subscriptionStore), Collections.singleton(subscriptionStore),
Collections.emptySet() Collections.singleton(foreignKeyValueGetter)
); );
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignJoinSubscriptionNode); builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignJoinSubscriptionNode);
@ -1232,7 +1237,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
resultSourceNodes.add(foreignResponseSource.nodeName()); resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes); builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>( final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter, primaryKeyValueGetter,
valueSerde == null ? null : valueSerde.serializer(), valueSerde == null ? null : valueSerde.serializer(),

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -43,24 +45,32 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class); private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder; private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema; private final CombinedKeySchema<KO, K> keySchema;
private final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier;
public ForeignJoinSubscriptionProcessorSupplier( public ForeignJoinSubscriptionProcessorSupplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder,
final CombinedKeySchema<KO, K> keySchema) { final CombinedKeySchema<KO, K> keySchema,
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetterSupplier) {
this.storeBuilder = storeBuilder; this.storeBuilder = storeBuilder;
this.keySchema = keySchema; this.keySchema = keySchema;
this.foreignKeyValueGetterSupplier = foreignKeyValueGetterSupplier;
} }
@Override @Override
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() { public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
return new KTableKTableJoinProcessor(); return new KTableKTableJoinProcessor(foreignKeyValueGetterSupplier.get());
} }
private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> { private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private Sensor droppedRecordsSensor; private Sensor droppedRecordsSensor;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store; private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;
private final KTableValueGetter<KO, VO> foreignKeyValueGetter;
private KTableKTableJoinProcessor(final KTableValueGetter<KO, VO> foreignKeyValueGetter) {
this.foreignKeyValueGetter = foreignKeyValueGetter;
}
@Override @Override
public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) { public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
@ -71,7 +81,8 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
internalProcessorContext.taskId().toString(), internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics() internalProcessorContext.metrics()
); );
store = internalProcessorContext.getStateStore(storeBuilder); subscriptionStore = internalProcessorContext.getStateStore(storeBuilder);
foreignKeyValueGetter.init(context);
} }
@Override @Override
@ -95,11 +106,21 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
return; return;
} }
// drop out-of-order records from versioned tables (cf. KIP-914)
if (foreignKeyValueGetter.isVersioned()) {
final ValueAndTimestamp<VO> latestValueAndTimestamp = foreignKeyValueGetter.get(record.key());
if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}
}
final Bytes prefixBytes = keySchema.prefixBytes(record.key()); final Bytes prefixBytes = keySchema.prefixBytes(record.key());
//Perform the prefixScan and propagate the results //Perform the prefixScan and propagate the results
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults = try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
store.range(prefixBytes, Bytes.increment(prefixBytes))) { subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) {
while (prefixScanResults.hasNext()) { while (prefixScanResults.hasNext()) {
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next(); final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
@ -118,6 +139,11 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
} }
} }
@Override
public void close() {
foreignKeyValueGetter.close();
}
private boolean prefixEquals(final byte[] x, final byte[] y) { private boolean prefixEquals(final byte[] x, final byte[] y) {
final int min = Math.min(x.length, y.length); final int min = Math.min(x.length, y.length);
final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min); final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -29,6 +31,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3; import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,6 +52,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private final Supplier<String> foreignKeySerdeTopicSupplier; private final Supplier<String> foreignKeySerdeTopicSupplier;
private final Supplier<String> valueSerdeTopicSupplier; private final Supplier<String> valueSerdeTopicSupplier;
private final boolean leftJoin; private final boolean leftJoin;
private final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier;
private Serializer<KO> foreignKeySerializer; private Serializer<KO> foreignKeySerializer;
private Serializer<V> valueSerializer; private Serializer<V> valueSerializer;
@ -57,18 +61,20 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
final Supplier<String> valueSerdeTopicSupplier, final Supplier<String> valueSerdeTopicSupplier,
final Serde<KO> foreignKeySerde, final Serde<KO> foreignKeySerde,
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final boolean leftJoin) { final boolean leftJoin,
final KTableValueGetterSupplier<K, V> primaryKeyValueGetterSupplier) {
this.foreignKeyExtractor = foreignKeyExtractor; this.foreignKeyExtractor = foreignKeyExtractor;
this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier; this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
this.valueSerdeTopicSupplier = valueSerdeTopicSupplier; this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
this.leftJoin = leftJoin; this.leftJoin = leftJoin;
this.primaryKeyValueGetterSupplier = primaryKeyValueGetterSupplier;
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer(); foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
} }
@Override @Override
public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() { public Processor<K, Change<V>, KO, SubscriptionWrapper<K>> get() {
return new UnbindChangeProcessor(); return new UnbindChangeProcessor(primaryKeyValueGetterSupplier.get());
} }
private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> { private class UnbindChangeProcessor extends ContextualProcessor<K, Change<V>, KO, SubscriptionWrapper<K>> {
@ -76,6 +82,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private Sensor droppedRecordsSensor; private Sensor droppedRecordsSensor;
private String foreignKeySerdeTopic; private String foreignKeySerdeTopic;
private String valueSerdeTopic; private String valueSerdeTopic;
private final KTableValueGetter<K, V> primaryKeyValueGetter;
private UnbindChangeProcessor(final KTableValueGetter<K, V> primaryKeyValueGetter) {
this.primaryKeyValueGetter = primaryKeyValueGetter;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
@ -95,10 +106,25 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
context.taskId().toString(), context.taskId().toString(),
(StreamsMetricsImpl) context.metrics() (StreamsMetricsImpl) context.metrics()
); );
primaryKeyValueGetter.init(context);
} }
@Override @Override
public void process(final Record<K, Change<V>> record) { public void process(final Record<K, Change<V>> record) {
// drop out-of-order records from versioned tables (cf. KIP-914)
if (primaryKeyValueGetter.isVersioned()) {
// key-value stores do not contain data for null keys, so skip the check
// if the key is null
if (record.key() != null) {
final ValueAndTimestamp<V> latestValueAndTimestamp = primaryKeyValueGetter.get(record.key());
if (latestValueAndTimestamp != null && latestValueAndTimestamp.timestamp() > record.timestamp()) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}
}
}
final long[] currentHash = record.value().newValue == null ? final long[] currentHash = record.value().newValue == null ?
null : null :
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue));
@ -107,37 +133,13 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
if (record.value().oldValue != null) { if (record.value().oldValue != null) {
final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
if (oldForeignKey == null) { if (oldForeignKey == null) {
if (context().recordMetadata().isPresent()) { logSkippedRecordDueToNullForeignKey();
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return; return;
} }
if (record.value().newValue != null) { if (record.value().newValue != null) {
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) { if (newForeignKey == null) {
if (context().recordMetadata().isPresent()) { logSkippedRecordDueToNullForeignKey();
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
return; return;
} }
@ -193,19 +195,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
} }
final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) { if (newForeignKey == null) {
if (context().recordMetadata().isPresent()) { logSkippedRecordDueToNullForeignKey();
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
} else { } else {
context().forward( context().forward(
record.withKey(newForeignKey) record.withKey(newForeignKey)
@ -217,5 +207,26 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
} }
} }
} }
@Override
public void close() {
primaryKeyValueGetter.close();
}
private void logSkippedRecordDueToNullForeignKey() {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
LOG.warn(
"Skipping record due to null foreign key. "
+ "topic=[{}] partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
);
} else {
LOG.warn(
"Skipping record due to null foreign key. Topic, partition, and offset not known."
);
}
droppedRecordsSensor.record();
}
} }
} }

View File

@ -16,10 +16,12 @@
*/ */
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestInputTopic;
@ -66,25 +68,45 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class KTableKTableForeignKeyJoinIntegrationTest { public class KTableKTableForeignKeyJoinIntegrationTest {
@Rule @Rule
public Timeout globalTimeout = Timeout.seconds(600); public Timeout globalTimeout = Timeout.seconds(600);
private static final String LEFT_TABLE = "left_table"; protected static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table"; protected static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic"; protected static final String OUTPUT = "output-topic";
private static final String REJOIN_OUTPUT = "rejoin-output-topic"; private static final String REJOIN_OUTPUT = "rejoin-output-topic";
private final boolean leftJoin;
private final boolean materialized;
private final String optimization;
private final boolean rejoin;
private Properties streamsConfig; private final MockTime time = new MockTime();
protected final boolean leftJoin;
protected final boolean materialized;
private final String optimization;
protected final boolean rejoin;
protected final boolean leftVersioned;
protected final boolean rightVersioned;
protected Properties streamsConfig;
protected long baseTimestamp;
public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin, public KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
final String optimization, final String optimization,
final boolean materialized, final boolean materialized,
final boolean rejoin) { final boolean rejoin) {
// versioning is disabled for these tests, even though the code supports building a
// topology with versioned tables, since KTableKTableForeignKeyVersionedJoinIntegrationTest
// extends this test class.
this(leftJoin, optimization, materialized, rejoin, false, false);
}
protected KTableKTableForeignKeyJoinIntegrationTest(final boolean leftJoin,
final String optimization,
final boolean materialized,
final boolean rejoin,
final boolean leftVersioned,
final boolean rightVersioned) {
this.rejoin = rejoin; this.rejoin = rejoin;
this.leftJoin = leftJoin; this.leftJoin = leftJoin;
this.materialized = materialized; this.materialized = materialized;
this.optimization = optimization; this.optimization = optimization;
this.leftVersioned = leftVersioned;
this.rightVersioned = rightVersioned;
} }
@Rule @Rule
@ -96,6 +118,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization) mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization)
)); ));
baseTimestamp = time.milliseconds();
} }
@Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}") @Parameterized.Parameters(name = "leftJoin={0}, optimization={1}, materialized={2}, rejoin={3}")
@ -105,7 +128,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
return buildParameters(booleans, optimizations, booleans, booleans); return buildParameters(booleans, optimizations, booleans, booleans);
} }
private static Collection<Object[]> buildParameters(final List<?>... argOptions) { protected static Collection<Object[]> buildParameters(final List<?>... argOptions) {
List<Object[]> result = new LinkedList<>(); List<Object[]> result = new LinkedList<>();
result.add(new Object[0]); result.add(new Object[0]);
@ -131,7 +154,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void doJoinFromLeftThenDeleteLeftEntity() { public void doJoinFromLeftThenDeleteLeftEntity() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@ -140,9 +163,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
right.pipeInput("rhs2", "rhsValue2"); right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this unreferenced FK won't show up in any results
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -161,8 +184,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
); );
} }
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
left.pipeInput("lhs2", "lhsValue2|rhs2"); left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
{ {
final Map<String, String> expected = mkMap( final Map<String, String> expected = mkMap(
@ -191,7 +214,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// Add another reference to an existing FK // Add another reference to an existing FK
left.pipeInput("lhs3", "lhsValue3|rhs1"); left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
{ {
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -220,7 +243,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// Now delete one LHS entity such that one delete is propagated down to the output. // Now delete one LHS entity such that one delete is propagated down to the output.
left.pipeInput("lhs1", (String) null); left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap( is(mkMap(
@ -249,7 +272,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void doJoinFromRightThenDeleteRightEntity() { public void doJoinFromRightThenDeleteRightEntity() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@ -257,54 +280,54 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
left.pipeInput("lhs2", "lhsValue2|rhs2"); left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1);
left.pipeInput("lhs3", "lhsValue3|rhs1"); left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(leftJoin is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
mkEntry("lhs2", "(lhsValue2|rhs2,null)"), mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
mkEntry("lhs3", "(lhsValue3|rhs1,null)")) mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
: emptyMap() : emptyMap()
) )
); );
if (materialized) { if (materialized) {
assertThat( assertThat(
asMap(store), asMap(store),
is(leftJoin is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
mkEntry("lhs2", "(lhsValue2|rhs2,null)"), mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
mkEntry("lhs3", "(lhsValue3|rhs1,null)")) mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
: emptyMap() : emptyMap()
) )
); );
} }
right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
) )
); );
if (materialized) { if (materialized) {
assertThat( assertThat(
asMap(store), asMap(store),
is(leftJoin is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs2,null)"), mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
: mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), : mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
) )
); );
} }
right.pipeInput("rhs2", "rhsValue2"); right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 4);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -314,13 +337,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
assertThat( assertThat(
asMap(store), asMap(store),
is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
) )
); );
} }
right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 5); // this unreferenced FK won't show up in any results
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -330,30 +353,30 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
assertThat( assertThat(
asMap(store), asMap(store),
is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), is(mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")) mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)"))
) )
); );
} }
// Now delete the RHS entity such that all matching keys have deletes propagated. // Now delete the RHS entity such that all matching keys have deletes propagated.
right.pipeInput("rhs1", (String) null); right.pipeInput("rhs1", (String) null, baseTimestamp + 6);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs1,null)" : null), is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs1,null)" : null),
mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null)) mkEntry("lhs3", leftJoin ? "(lhsValue3|rhs1,null)" : null))
) )
); );
if (materialized) { if (materialized) {
assertThat( assertThat(
asMap(store), asMap(store),
is(leftJoin is(leftJoin
? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
mkEntry("lhs3", "(lhsValue3|rhs1,null)")) mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
: mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")) : mkMap(mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"))
) )
); );
} }
@ -362,13 +385,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() { public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{ {
final Map<String, String> expected = final Map<String, String> expected =
@ -388,7 +411,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
// it's not possible to know whether a result was previously emitted. // it's not possible to know whether a result was previously emitted.
// For the left join, the tombstone is necessary. // For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null); left.pipeInput("lhs1", (String) null, baseTimestamp + 1);
{ {
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -403,7 +426,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// Deleting a non-existing record is idempotent // Deleting a non-existing record is idempotent
left.pipeInput("lhs1", (String) null); left.pipeInput("lhs1", (String) null, baseTimestamp + 2);
{ {
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -421,14 +444,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() { public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// Deleting a record that never existed doesn't need to emit tombstones. // Deleting a record that never existed doesn't need to emit tombstones.
left.pipeInput("lhs1", (String) null); left.pipeInput("lhs1", (String) null, baseTimestamp);
{ {
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -446,14 +469,14 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() { public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
// no output for a new inner join on a non-existent FK // no output for a new inner join on a non-existent FK
// the left join of course emits the half-joined output // the left join of course emits the half-joined output
assertThat( assertThat(
@ -470,7 +493,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// since it impossible to know whether the prior FK existed or not (and thus whether any results have // since it impossible to know whether the prior FK existed or not (and thus whether any results have
// previously been emitted) // previously been emitted)
// The left join emits a _necessary_ update (since the lhs record has actually changed) // The left join emits a _necessary_ update (since the lhs record has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2"); left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 1);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
@ -482,7 +505,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
); );
} }
// of course, moving it again to yet another non-existent FK has the same effect // of course, moving it again to yet another non-existent FK has the same effect
left.pipeInput("lhs1", "lhsValue1|rhs3"); left.pipeInput("lhs1", "lhsValue1|rhs3", baseTimestamp + 2);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
@ -497,7 +520,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one // Adding an RHS record now, so that we can demonstrate "moving" from a non-existent FK to an existent one
// This RHS key was previously referenced, but it's not referenced now, so adding this record should // This RHS key was previously referenced, but it's not referenced now, so adding this record should
// result in no changes whatsoever. // result in no changes whatsoever.
right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 3);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(emptyMap()) is(emptyMap())
@ -510,7 +533,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// now, we change to a FK that exists, and see the join completes // now, we change to a FK that exists, and see the join completes
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 4);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap( is(mkMap(
@ -528,7 +551,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the // but if we update it again to a non-existent one, we'll get a tombstone for the inner join, and the
// left join updates appropriately. // left join updates appropriately.
left.pipeInput("lhs1", "lhsValue1|rhs2"); left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
is(mkMap( is(mkMap(
@ -546,7 +569,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Test @Test
public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() { public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
@ -555,8 +578,8 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
// then populate update on RHS // then populate update on RHS
right.pipeInput("rhs1", "rhsValue1"); right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
right.pipeInput("rhs2", "rhsValue2"); right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -569,7 +592,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
); );
} }
left.pipeInput("lhs1", "lhsValue1|rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2);
{ {
final Map<String, String> expected = mkMap( final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)") mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
@ -587,7 +610,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// Change LHS foreign key reference // Change LHS foreign key reference
left.pipeInput("lhs1", "lhsValue1|rhs2"); left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 3);
{ {
final Map<String, String> expected = mkMap( final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)") mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
@ -605,7 +628,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
// Populate RHS update on old LHS foreign key ref // Populate RHS update on old LHS foreign key ref
right.pipeInput("rhs1", "rhsValue1Delta"); right.pipeInput("rhs1", "rhsValue1Delta", baseTimestamp + 4);
{ {
assertThat( assertThat(
outputTopic.readKeyValuesToMap(), outputTopic.readKeyValuesToMap(),
@ -623,29 +646,52 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
} }
} }
private static Map<String, String> asMap(final KeyValueStore<String, String> store) { protected static Map<String, String> asMap(final KeyValueStore<String, String> store) {
final HashMap<String, String> result = new HashMap<>(); final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
return result; return result;
} }
private static Topology getTopology(final Properties streamsConfig, protected static Topology getTopology(final Properties streamsConfig,
final String queryableStoreName, final String queryableStoreName,
final boolean leftJoin, final boolean leftJoin,
final boolean rejoin) { final boolean rejoin,
final boolean leftVersioned,
final boolean rightVersioned) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, String> left = builder.table( final KTable<String, String> left;
LEFT_TABLE, if (leftVersioned) {
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), left = builder.table(
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) LEFT_TABLE,
); Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
final KTable<String, String> right = builder.table( serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)),
RIGHT_TABLE, Materialized.as(Stores.persistentVersionedKeyValueStore("left", Duration.ofMinutes(5)))
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), );
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) } else {
); left = builder.table(
LEFT_TABLE,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
}
final KTable<String, String> right;
if (rightVersioned) {
right = builder.table(
RIGHT_TABLE,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)),
Materialized.as(Stores.persistentVersionedKeyValueStore("right", Duration.ofMinutes(5)))
);
} else {
right = builder.table(
RIGHT_TABLE,
Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
);
}
final Function<String, String> extractor = value -> value.split("\\|")[1]; final Function<String, String> extractor = value -> value.split("\\|")[1];
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
@ -678,13 +724,13 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.leftJoin(right, extractor, joiner, mainMaterialized); left.leftJoin(right, extractor, joiner, mainMaterialized);
fkJoin.toStream() fkJoin.toStream()
.to(OUTPUT); .to(OUTPUT);
// also make sure the FK join is set up right for downstream operations that require materialization // also make sure the FK join is set up right for downstream operations that require materialization
if (rejoin) { if (rejoin) {
fkJoin.leftJoin(left, rejoiner, rejoinMaterialized) fkJoin.leftJoin(left, rejoiner, rejoinMaterialized)
.toStream() .toStream()
.to(REJOIN_OUTPUT); .to(REJOIN_OUTPUT);
} }
} else { } else {
final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized); final KTable<String, String> fkJoin = left.join(right, extractor, joiner, mainMaterialized);
@ -696,12 +742,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// also make sure the FK join is set up right for downstream operations that require materialization // also make sure the FK join is set up right for downstream operations that require materialization
if (rejoin) { if (rejoin) {
fkJoin.join(left, rejoiner, rejoinMaterialized) fkJoin.join(left, rejoiner, rejoinMaterialized)
.toStream() .toStream()
.to(REJOIN_OUTPUT); .to(REJOIN_OUTPUT);
} }
} }
return builder.build(streamsConfig); return builder.build(streamsConfig);
} }
} }

View File

@ -0,0 +1,301 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKTableForeignKeyJoinIntegrationTest {
public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean leftJoin,
final boolean materialized,
final boolean leftVersioned,
final boolean rightVersioned) {
// optimizations and rejoin are disabled for these tests, as these tests focus on versioning.
// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for optimizations and rejoin
super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, leftVersioned, rightVersioned);
}
@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, leftVersioned={2}, rightVersioned={3}")
public static Collection<Object[]> data() {
final List<Boolean> booleans = Arrays.asList(true, false);
return buildParameters(booleans, booleans, booleans, booleans);
}
@Test
public void shouldIgnoreOutOfOrderRecordsIffVersioned() {
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
// RHS record
right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4);
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
if (materialized) {
assertThat(
asMap(store),
is(emptyMap())
);
}
// LHS records with match to existing RHS record
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
left.pipeInput("lhs2", "lhsValue2|rhs1", baseTimestamp + 5);
{
final Map<String, String> expected = mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
);
assertThat(
outputTopic.readKeyValuesToMap(),
is(expected)
);
if (materialized) {
assertThat(
asMap(store),
is(expected)
);
}
}
// out-of-order LHS record (for existing key) does not produce a new result iff LHS is versioned
left.pipeInput("lhs1", "lhsValue1_ooo|rhs1", baseTimestamp + 2);
if (leftVersioned) {
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
} else {
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)")
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
}
// out-of-order LHS tombstone (for existing key) is similarly ignored (iff LHS is versioned)
left.pipeInput("lhs1", null, baseTimestamp + 2);
if (leftVersioned) {
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
} else {
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", null)
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
}
// LHS record with larger timestamp always produces a new result
left.pipeInput("lhs1", "lhsValue1_new|rhs1", baseTimestamp + 8);
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)")
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
}
// out-of-order RHS record (for existing key) does not produce a new result iff RHS is versioned
right.pipeInput("rhs1", "rhsValue1_ooo", baseTimestamp + 1);
if (rightVersioned) {
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
} else {
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_ooo)")
))
);
}
}
// out-of-order RHS tombstone (for existing key) is similarly ignored (iff RHS is versioned)
right.pipeInput("rhs1", null, baseTimestamp + 1);
if (rightVersioned) {
assertThat(
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")
))
);
}
} else {
if (leftJoin) {
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
mkEntry("lhs2", "(lhsValue2|rhs1,null)")
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"),
mkEntry("lhs2", "(lhsValue2|rhs1,null)")
))
);
}
} else {
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", null),
mkEntry("lhs2", null)
))
);
if (materialized) {
assertThat(
asMap(store),
is(emptyMap())
);
}
}
}
// RHS record with larger timestamps always produces new results
right.pipeInput("rhs1", "rhsValue1_new", baseTimestamp + 6);
{
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
))
);
if (materialized) {
assertThat(
asMap(store),
is(mkMap(
mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"),
mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1_new)")
))
);
}
}
}
}
}

View File

@ -3149,7 +3149,7 @@ public class KStreamImplTest {
" Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" + " Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" +
" --> KTABLE-TOSTREAM-0000000020\n" + " --> KTABLE-TOSTREAM-0000000020\n" +
" <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" +
" Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n" + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" +
" --> KTABLE-SINK-0000000008\n" + " --> KTABLE-SINK-0000000008\n" +
" <-- KSTREAM-TOTABLE-0000000001\n" + " <-- KSTREAM-TOTABLE-0000000001\n" +
" Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" + " Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" +
@ -3174,7 +3174,7 @@ public class KStreamImplTest {
" Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005])\n" +
" --> KTABLE-SINK-0000000015\n" + " --> KTABLE-SINK-0000000015\n" +
" <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" +
" Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000005, KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" +
" --> KTABLE-SINK-0000000015\n" + " --> KTABLE-SINK-0000000015\n" +
" <-- KSTREAM-TOTABLE-0000000004\n" + " <-- KSTREAM-TOTABLE-0000000004\n" +
" Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" + " Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" +