KAFKA-16141: Fix StreamsStandbyTask system test (#15217)

KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.

This PR reverts this change for now.

Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-01-19 09:23:42 -08:00 committed by GitHub
parent 7090a5231a
commit aaccf542d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 4 deletions

View File

@ -187,7 +187,7 @@ public class CachingKeyValueStore
final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key);
if (lruCacheEntry != null) {
final byte[] rawValue;
if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped())) {
if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) {
rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
} else {
rawValue = lruCacheEntry.value();

View File

@ -35,7 +35,6 @@ import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import java.util.List;
@ -54,7 +53,7 @@ import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial
*/
@SuppressWarnings("unchecked")
public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]> {
final KeyValueStore<Bytes, byte[]> store;
KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {

View File

@ -410,7 +410,7 @@ public final class StoreQueryUtils {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes, final StateStore wrapped) {
final Serde<V> valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isAdapter(wrapped);
final Deserializer<V> deserializer;
if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
@ -422,6 +422,16 @@ public final class StoreQueryUtils {
return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
}
public static boolean isAdapter(final StateStore stateStore) {
if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) {
return true;
} else if (stateStore instanceof WrappedStateStore) {
return isAdapter(((WrappedStateStore) stateStore).wrapped());
} else {
return false;
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> getDeserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde();