KAFKA-16046: also fix stores for outer join (#15073)

This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Almog Gavra 2024-01-02 15:07:46 -08:00 committed by GitHub
parent 86a387c3c8
commit e6875f378c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 13 deletions

View File

@ -45,7 +45,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
@Override
public StateStore build() {
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;

View File

@ -95,11 +95,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
// case 1: dslStoreSuppliers was explicitly passed in
supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams);
} else if (streamJoined.thisStoreSupplier() != null) {
// case 2: thisStoreSupplier was explicitly passed in, we match
// the type for that one
@ -110,12 +111,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
} else {
// couldn't determine the type of bytes store for thisStoreSupplier,
// fallback to the default
supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
} else {
// case 3: nothing was explicitly passed in, fallback to default which
// was configured via either the TopologyConfig or StreamsConfig globally
supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>

View File

@ -36,7 +36,9 @@ public class BuiltInDslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
return Stores.persistentTimestampedKeyValueStore(params.name());
return params.isTimestamped()
? Stores.persistentTimestampedKeyValueStore(params.name())
: Stores.persistentKeyValueStore(params.name());
}
@Override

View File

@ -25,19 +25,26 @@ import java.util.Objects;
public class DslKeyValueParams {
private final String name;
private final boolean isTimestamped;
/**
* @param name the name of the store (cannot be {@code null})
* @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
*/
public DslKeyValueParams(final String name) {
public DslKeyValueParams(final String name, final boolean isTimestamped) {
Objects.requireNonNull(name);
this.name = name;
this.isTimestamped = isTimestamped;
}
public String name() {
return name;
}
public boolean isTimestamped() {
return isTimestamped;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -47,18 +54,20 @@ public class DslKeyValueParams {
return false;
}
final DslKeyValueParams that = (DslKeyValueParams) o;
return Objects.equals(name, that.name);
return isTimestamped == that.isTimestamped
&& Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
return Objects.hash(name, isTimestamped);
}
@Override
public String toString() {
return "DslKeyValueParams{" +
"name='" + name + '\'' +
"isTimestamped=" + isTimestamped +
'}';
}
}

View File

@ -108,7 +108,8 @@ public class DslWindowParams {
&& Objects.equals(retentionPeriod, that.retentionPeriod)
&& Objects.equals(windowSize, that.windowSize)
&& Objects.equals(emitStrategy, that.emitStrategy)
&& Objects.equals(isSlidingWindow, that.isSlidingWindow);
&& Objects.equals(isSlidingWindow, that.isSlidingWindow)
&& Objects.equals(isTimestamped, that.isTimestamped);
}
@Override
@ -119,7 +120,8 @@ public class DslWindowParams {
windowSize,
retainDuplicates,
emitStrategy,
isSlidingWindow
isSlidingWindow,
isTimestamped
);
}
@ -132,6 +134,7 @@ public class DslWindowParams {
", retainDuplicates=" + retainDuplicates +
", emitStrategy=" + emitStrategy +
", isSlidingWindow=" + isSlidingWindow +
", isTimestamped=" + isTimestamped +
'}';
}
}

View File

@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockApiProcessorSupplier;
@ -1299,7 +1299,7 @@ public class StreamsBuilderTest {
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
RocksDBTimestampedStore.class);
RocksDBStore.class);
}
@Test

View File

@ -33,8 +33,12 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
@ -50,9 +54,11 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKStreamOuterJoinTest {
@ -1308,4 +1314,47 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(0, "dummy+null", 1103L)
);
}
public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>();
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
final KeyValueBytesStoreSupplier result = super.keyValueStore(params);
capture.set(result);
return result;
}
}
@Test
public void shouldJoinWithNonTimestampedStore() {
final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
final StreamJoined<Integer, String, String> streamJoined =
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
.withDslStoreSuppliers(suppliers);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
streamJoined
);
joined.process(supplier);
// create a TTD so that the topology gets built
try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
assertThat("Expected stream joined to supply builders that create non-timestamped stores",
!WrappedStateStore.isTimestamped(suppliers.capture.get().get()));
}
}
}