KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter (#13496)

In preparation for updating DSL processors to use versioned stores (cf KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and get(key, asOfTimestamp) and updates all existing implementations accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-11 23:40:11 -04:00 committed by GitHub
parent f1f35ef1a8
commit cb7d0833ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 198 additions and 13 deletions

View File

@ -152,5 +152,15 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupp
public ValueAndTimestamp<VAgg> get(final KIn key) {
return store.get(key);
}
@Override
public ValueAndTimestamp<VAgg> get(final KIn key, final long asOfTimestamp) {
return store.get(key, asOfTimestamp);
}
@Override
public boolean isVersioned() {
return store.isVersionedStore();
}
}
}

View File

@ -147,6 +147,16 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, K,
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
@Override
public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
return store.get(key, asOfTimestamp);
}
@Override
public boolean isVersioned() {
return store.isVersionedStore();
}
}
}

View File

@ -379,5 +379,10 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
store.fetchSession(key.key(), key.window().start(), key.window().end()),
key.window().end());
}
@Override
public boolean isVersioned() {
return false;
}
}
}

View File

@ -499,5 +499,10 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
@Override
public void close() {}
@Override
public boolean isVersioned() {
return false;
}
}
}

View File

@ -210,5 +210,10 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
final W window = (W) windowedKey.window();
return windowStore.fetch(key, window.start());
}
@Override
public boolean isVersioned() {
return false;
}
}
}

View File

@ -177,6 +177,16 @@ class KTableFilter<KIn, VIn> implements KTableProcessorSupplier<KIn, VIn, KIn, V
return computeValue(key, parentGetter.get(key));
}
@Override
public ValueAndTimestamp<VIn> get(final KIn key, final long asOfTimestamp) {
return computeValue(key, parentGetter.get(key, asOfTimestamp));
}
@Override
public boolean isVersioned() {
return parentGetter.isVersioned();
}
@Override
public void close() {
parentGetter.close();

View File

@ -157,7 +157,8 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
final V1 value1 = getValueOrNull(valueAndTimestamp1);
if (value1 != null) {
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(keyValueMapper.apply(key, value1));
final ValueAndTimestamp<V2> valueAndTimestamp2
= valueGetter2.get(keyValueMapper.apply(key, value1));
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 != null) {
@ -172,6 +173,14 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
}
}
@Override
public boolean isVersioned() {
// even though we can derive a proper versioned result (assuming both parent value
// getters are versioned), we choose not to since the output of a join of two
// versioned tables today is not considered versioned (cf KIP-914)
return false;
}
@Override
public void close() {
valueGetter1.close();

View File

@ -177,6 +177,14 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
}
}
@Override
public boolean isVersioned() {
// even though we can derive a proper versioned result (assuming both parent value
// getters are versioned), we choose not to since the output of a join of two
// versioned tables today is not considered versioned (cf KIP-914)
return false;
}
@Override
public void close() {
valueGetter1.close();

View File

@ -185,6 +185,14 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
return ValueAndTimestamp.make(newValue, Math.max(timestamp1, timestamp2));
}
@Override
public boolean isVersioned() {
// even though we can derive a proper versioned result (assuming both parent value
// getters are versioned), we choose not to since the output of a join of two
// versioned tables today is not considered versioned (cf KIP-914)
return false;
}
@Override
public void close() {
valueGetter1.close();

View File

@ -168,6 +168,14 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,
}
}
@Override
public boolean isVersioned() {
// even though we can derive a proper versioned result (assuming both parent value
// getters are versioned), we choose not to since the output of a join of two
// versioned tables today is not considered versioned (cf KIP-914)
return false;
}
@Override
public void close() {
valueGetter1.close();

View File

@ -164,6 +164,16 @@ class KTableMapValues<KIn, VIn, VOut> implements KTableProcessorSupplier<KIn, VI
return computeValueAndTimestamp(key, parentGetter.get(key));
}
@Override
public ValueAndTimestamp<VOut> get(final KIn key, final long asOfTimestamp) {
return computeValueAndTimestamp(key, parentGetter.get(key, asOfTimestamp));
}
@Override
public boolean isVersioned() {
return parentGetter.isVersioned();
}
@Override
public void close() {
parentGetter.close();

View File

@ -48,5 +48,15 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
@Override
public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
return store.get(key, asOfTimestamp);
}
@Override
public boolean isVersioned() {
return store.isVersionedStore();
}
}
}

View File

@ -91,5 +91,14 @@ public class KTablePassThrough<KIn, VIn> implements KTableProcessorSupplier<KIn,
return store.get(key);
}
@Override
public ValueAndTimestamp<VIn> get(final KIn key, final long asOfTimestamp) {
return store.get(key, asOfTimestamp);
}
@Override
public boolean isVersioned() {
return store.isVersionedStore();
}
}
}

View File

@ -172,17 +172,30 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS
@Override
public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key) {
final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
);
return mapValue(key, parentGetter.get(key));
}
@Override
public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key, final long asOfTimestamp) {
return mapValue(key, parentGetter.get(key, asOfTimestamp));
}
@Override
public boolean isVersioned() {
return parentGetter.isVersioned();
}
@Override
public void close() {
parentGetter.close();
}
private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
);
}
}
}

View File

@ -47,5 +47,15 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
@Override
public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
return store.get(key, asOfTimestamp);
}
@Override
public boolean isVersioned() {
return store.isVersionedStore();
}
}
}

View File

@ -153,8 +153,26 @@ class KTableTransformValues<K, V, VOut> implements KTableProcessorSupplier<K, V,
@Override
public ValueAndTimestamp<VOut> get(final K key) {
final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
return transformValue(key, parentGetter.get(key));
}
@Override
public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) {
return transformValue(key, parentGetter.get(key, asOfTimestamp));
}
@Override
public boolean isVersioned() {
return parentGetter.isVersioned();
}
@Override
public void close() {
parentGetter.close();
valueTransformer.close();
}
private ValueAndTimestamp<VOut> transformValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
final ProcessorRecordContext currentContext = internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(new ProcessorRecordContext(
@ -177,11 +195,5 @@ class KTableTransformValues<K, V, VOut> implements KTableProcessorSupplier<K, V,
return result;
}
@Override
public void close() {
parentGetter.close();
valueTransformer.close();
}
}
}

View File

@ -25,5 +25,21 @@ public interface KTableValueGetter<K, V> {
ValueAndTimestamp<V> get(K key);
/**
* Returns the latest record version, associated with the provided key, with timestamp
* not exceeding the provided timestamp bound. This method may only be called if
* {@link #isVersioned()} is true.
*/
default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
}
/**
* @return whether this value getter supports multiple record versions for the same key.
* If true, then {@link #get(Object, long)} must be implemented. If not, then
* {@link #get(Object, long)} must not be called.
*/
boolean isVersioned();
default void close() {}
}

View File

@ -89,6 +89,11 @@ public class KTableSuppressProcessorSupplier<K, V> implements
}
}
@Override
public boolean isVersioned() {
return false;
}
@Override
public void close() {
// the main processor is responsible for the buffer's lifecycle

View File

@ -81,6 +81,14 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store");
}
public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
if (!isVersionedStore()) {
throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores");
}
final VersionedRecord<V> versionedRecord = versionedStore.get(key, asOfTimestamp);
return versionedRecord == null ? null : ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
}
public void put(final K key, final V value, final long timestamp) {
if (timestampedStore != null) {
timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp));
@ -97,6 +105,10 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
return store;
}
public boolean isVersionedStore() {
return versionedStore != null;
}
@Override
public String name() {
return store.name();

View File

@ -352,6 +352,11 @@ public class ForeignJoinSubscriptionProcessorSupplierTest {
public void init(final ProcessorContext context) {
}
@Override
public boolean isVersioned() {
return false;
}
};
return new KTableValueGetterSupplier<String, String>() {
@Override

View File

@ -56,6 +56,11 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
public ValueAndTimestamp<V> get(final K key) {
return ValueAndTimestamp.make(map.get(key), -1);
}
@Override
public boolean isVersioned() {
return false;
}
};
}