KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly (#18490)

RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides
via Joined, but always uses serdes from StreamsConfig.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-15 10:04:13 -08:00 committed by GitHub
parent 65daa1db2f
commit 30f94b5320
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 27 deletions

View File

@ -125,10 +125,16 @@ public abstract class AbstractJoinIntegrationTest {
final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
Properties setupConfigsAndUtils(final boolean cacheEnabled) {
return setupConfigsAndUtils(cacheEnabled, true);
}
Properties setupConfigsAndUtils(final boolean cacheEnabled, final boolean setSerdes) {
final Properties streamsConfig = new Properties();
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
if (setSerdes) {
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class);
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
}
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
if (!cacheEnabled) {
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@ -260,16 +266,13 @@ public abstract class AbstractJoinIntegrationTest {
private void checkQueryableStore(final String queryableName, final TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) {
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore(queryableName);
final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all();
try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all()) {
final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
try {
assertThat(onlyEntry.key, is(expectedFinalResult.key()));
assertThat(onlyEntry.value.value(), is(expectedFinalResult.value()));
assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp()));
assertThat(all.hasNext(), is(false));
} finally {
all.close();
}
}

View File

@ -19,10 +19,12 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
@ -52,13 +54,13 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra
@ValueSource(booleans = {true, false})
public void testInnerWithVersionedStore(final boolean cacheEnabled) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5))));
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, false);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner");
leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);
leftStream.join(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));
final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
@ -96,7 +98,7 @@ public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegra
final KStream<Long, String> leftStream = builder.stream(INPUT_TOPIC_LEFT);
final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5))));
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled);
final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, true);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left");
leftStream.leftJoin(rightTable, valueJoiner, JOINED).to(OUTPUT_TOPIC);

View File

@ -1162,7 +1162,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferName = name + "-Buffer";
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName, joinedInternal.gracePeriod(), name));
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
bufferName,
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde,
joinedInternal.gracePeriod(),
name)
);
}
final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(

View File

@ -64,13 +64,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal
public static class Builder<K, V> implements StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
private final String storeName;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private boolean loggingEnabled = true;
private Map<String, String> logConfig = new HashMap<>();
private final Duration grace;
private final String topic;
public Builder(final String storeName, final Duration grace, final String topic) {
public Builder(
final String storeName,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Duration grace,
final String topic
) {
this.storeName = storeName;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.grace = grace;
this.topic = topic;
}
@ -115,6 +125,8 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal
public TimeOrderedKeyValueBuffer<K, V, V> build() {
return new RocksDBTimeOrderedKeyValueBuffer<>(
new RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(),
keySerde,
valueSerde,
grace,
topic,
loggingEnabled);
@ -138,10 +150,14 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal
public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Duration gracePeriod,
final String topic,
final boolean loggingEnabled) {
this.store = store;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.gracePeriod = gracePeriod.toMillis();
minTimestamp = store.minTimestamp();
minValid = false;

View File

@ -62,18 +62,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@SuppressWarnings({"rawtypes", "unchecked"})
@BeforeEach
public void setUp() {
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
final Metrics metrics = new Metrics();
offset = 0;
streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
context = new MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory());
}
private void createBuffer(final Duration grace) {
private void createBuffer(final Duration grace, final Serde<String> serde) {
final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing", false);
buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, grace, "testing", false);
buffer.setSerdesIfNull(serdeGetter);
buffer.init(context, store);
}
@ -86,14 +84,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldReturnIfRecordWasAdded() {
createBuffer(Duration.ofMillis(1));
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ofMillis(1), null);
assertThat(pipeRecord("K", "V", 2L), equalTo(true));
assertThat(pipeRecord("K", "V", 0L), equalTo(false));
}
@Test
public void shouldPutInBufferAndUpdateFields() {
createBuffer(Duration.ofMinutes(1));
createBuffer(Duration.ofMinutes(1), Serdes.String());
assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@ -103,7 +103,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldAddAndEvictRecord() {
createBuffer(Duration.ZERO);
createBuffer(Duration.ZERO, Serdes.String());
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@ -114,7 +114,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldAddAndEvictRecordTwice() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
@ -130,7 +132,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {
createBuffer(Duration.ofMillis(1));
createBuffer(Duration.ofMillis(1), Serdes.String());
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement());
@ -144,7 +146,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldAddRecordsTwiceAndEvictRecordsOnce() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 1, r -> count.getAndIncrement());
@ -156,7 +160,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldDropLateRecords() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
pipeRecord("1", "0", 1L);
assertNumSizeAndTimestamp(buffer, 1, 1, 42);
pipeRecord("2", "0", 0L);
@ -165,7 +171,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldDropLateRecordsWithNonZeroGrace() {
createBuffer(Duration.ofMillis(1));
createBuffer(Duration.ofMillis(1), Serdes.String());
pipeRecord("1", "0", 2L);
assertNumSizeAndTimestamp(buffer, 1, 2, 42);
pipeRecord("2", "0", 1L);
@ -176,7 +182,9 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
@Test
public void shouldHandleCollidingKeys() {
createBuffer(Duration.ofMillis(1));
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ofMillis(1), null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("2", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement());