From cb7d0833ee02e190a194cc5bd28fd2b3ac31cccb Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Tue, 11 Apr 2023 23:40:11 -0400 Subject: [PATCH] KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter (#13496) In preparation for updating DSL processors to use versioned stores (cf KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and get(key, asOfTimestamp) and updates all existing implementations accordingly. Reviewers: Matthias J. Sax --- .../kstream/internals/KStreamAggregate.java | 10 +++++++ .../kstream/internals/KStreamReduce.java | 10 +++++++ .../KStreamSessionWindowAggregate.java | 5 ++++ .../KStreamSlidingWindowAggregate.java | 5 ++++ .../internals/KStreamWindowAggregate.java | 5 ++++ .../kstream/internals/KTableFilter.java | 10 +++++++ .../internals/KTableKTableInnerJoin.java | 11 +++++++- .../internals/KTableKTableLeftJoin.java | 8 ++++++ .../internals/KTableKTableOuterJoin.java | 8 ++++++ .../internals/KTableKTableRightJoin.java | 8 ++++++ .../kstream/internals/KTableMapValues.java | 10 +++++++ ...KTableMaterializedValueGetterSupplier.java | 10 +++++++ .../kstream/internals/KTablePassThrough.java | 9 +++++++ .../internals/KTableRepartitionMap.java | 23 ++++++++++++---- .../KTableSourceValueGetterSupplier.java | 10 +++++++ .../internals/KTableTransformValues.java | 26 ++++++++++++++----- .../kstream/internals/KTableValueGetter.java | 16 ++++++++++++ .../KTableSuppressProcessorSupplier.java | 5 ++++ .../state/internals/KeyValueStoreWrapper.java | 12 +++++++++ ...JoinSubscriptionProcessorSupplierTest.java | 5 ++++ ...tionResolverJoinProcessorSupplierTest.java | 5 ++++ 21 files changed, 198 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 27a3d488918..69d346c75e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -152,5 +152,15 @@ public class KStreamAggregate implements KStreamAggProcessorSupp public ValueAndTimestamp get(final KIn key) { return store.get(key); } + + @Override + public ValueAndTimestamp get(final KIn key, final long asOfTimestamp) { + return store.get(key, asOfTimestamp); + } + + @Override + public boolean isVersioned() { + return store.isVersionedStore(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index b801d2b60ea..be420579cae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -147,6 +147,16 @@ public class KStreamReduce implements KStreamAggProcessorSupplier get(final K key) { return store.get(key); } + + @Override + public ValueAndTimestamp get(final K key, final long asOfTimestamp) { + return store.get(key, asOfTimestamp); + } + + @Override + public boolean isVersioned() { + return store.isVersionedStore(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index f8252358b08..2a3f27c68fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -379,5 +379,10 @@ public class KStreamSessionWindowAggregate implements KStreamAgg store.fetchSession(key.key(), key.window().start(), key.window().end()), key.window().end()); } + + @Override + public boolean isVersioned() { + return false; + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java index e75427d6b89..98f5e812746 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java @@ -499,5 +499,10 @@ public class KStreamSlidingWindowAggregate implements KStreamAgg @Override public void close() {} + + @Override + public boolean isVersioned() { + return false; + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 561524f87e7..4fb4f9c00ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -210,5 +210,10 @@ public class KStreamWindowAggregate implements final W window = (W) windowedKey.window(); return windowStore.fetch(key, window.start()); } + + @Override + public boolean isVersioned() { + return false; + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 6287aa40c55..f3d6edd26e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -177,6 +177,16 @@ class KTableFilter implements KTableProcessorSupplier get(final KIn key, final long asOfTimestamp) { + return computeValue(key, parentGetter.get(key, asOfTimestamp)); + } + + @Override + public boolean isVersioned() { + return parentGetter.isVersioned(); + } + @Override public void close() { parentGetter.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java index d9ac0ae8852..0f264255597 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java @@ -157,7 +157,8 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin valueAndTimestamp2 = valueGetter2.get(keyValueMapper.apply(key, value1)); + final ValueAndTimestamp valueAndTimestamp2 + = valueGetter2.get(keyValueMapper.apply(key, value1)); final V2 value2 = getValueOrNull(valueAndTimestamp2); if (value2 != null) { @@ -172,6 +173,14 @@ class KTableKTableInnerJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin extends KTableKTableAbstractJoin implements KTableProcessorSupplier get(final KIn key, final long asOfTimestamp) { + return computeValueAndTimestamp(key, parentGetter.get(key, asOfTimestamp)); + } + + @Override + public boolean isVersioned() { + return parentGetter.isVersioned(); + } + @Override public void close() { parentGetter.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java index ba7e65081d5..afe648ca219 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java @@ -48,5 +48,15 @@ public class KTableMaterializedValueGetterSupplier implements KTableValueG public ValueAndTimestamp get(final K key) { return store.get(key); } + + @Override + public ValueAndTimestamp get(final K key, final long asOfTimestamp) { + return store.get(key, asOfTimestamp); + } + + @Override + public boolean isVersioned() { + return store.isVersionedStore(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java index 91fe0e4a277..87fc0f49de9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java @@ -91,5 +91,14 @@ public class KTablePassThrough implements KTableProcessorSupplier get(final KIn key, final long asOfTimestamp) { + return store.get(key, asOfTimestamp); + } + + @Override + public boolean isVersioned() { + return store.isVersionedStore(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 2ff826c0110..849d5bf7a0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -172,17 +172,30 @@ public class KTableRepartitionMap implements KTableRepartitionMapS @Override public ValueAndTimestamp> get(final K key) { - final ValueAndTimestamp valueAndTimestamp = parentGetter.get(key); - return ValueAndTimestamp.make( - mapper.apply(key, getValueOrNull(valueAndTimestamp)), - valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() - ); + return mapValue(key, parentGetter.get(key)); + } + + @Override + public ValueAndTimestamp> get(final K key, final long asOfTimestamp) { + return mapValue(key, parentGetter.get(key, asOfTimestamp)); + } + + @Override + public boolean isVersioned() { + return parentGetter.isVersioned(); } @Override public void close() { parentGetter.close(); } + + private ValueAndTimestamp> mapValue(final K key, final ValueAndTimestamp valueAndTimestamp) { + return ValueAndTimestamp.make( + mapper.apply(key, getValueOrNull(valueAndTimestamp)), + valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() + ); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java index 1c340943354..8599a50f60b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java @@ -47,5 +47,15 @@ public class KTableSourceValueGetterSupplier implements KTableValueGetterS public ValueAndTimestamp get(final K key) { return store.get(key); } + + @Override + public ValueAndTimestamp get(final K key, final long asOfTimestamp) { + return store.get(key, asOfTimestamp); + } + + @Override + public boolean isVersioned() { + return store.isVersionedStore(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index 5975571f0fa..ba1705b033b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -153,8 +153,26 @@ class KTableTransformValues implements KTableProcessorSupplier get(final K key) { - final ValueAndTimestamp valueAndTimestamp = parentGetter.get(key); + return transformValue(key, parentGetter.get(key)); + } + @Override + public ValueAndTimestamp get(final K key, final long asOfTimestamp) { + return transformValue(key, parentGetter.get(key, asOfTimestamp)); + } + + @Override + public boolean isVersioned() { + return parentGetter.isVersioned(); + } + + @Override + public void close() { + parentGetter.close(); + valueTransformer.close(); + } + + private ValueAndTimestamp transformValue(final K key, final ValueAndTimestamp valueAndTimestamp) { final ProcessorRecordContext currentContext = internalProcessorContext.recordContext(); internalProcessorContext.setRecordContext(new ProcessorRecordContext( @@ -177,11 +195,5 @@ class KTableTransformValues implements KTableProcessorSupplier { ValueAndTimestamp get(K key); + /** + * Returns the latest record version, associated with the provided key, with timestamp + * not exceeding the provided timestamp bound. This method may only be called if + * {@link #isVersioned()} is true. + */ + default ValueAndTimestamp get(K key, long asOfTimestamp) { + throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores"); + } + + /** + * @return whether this value getter supports multiple record versions for the same key. + * If true, then {@link #get(Object, long)} must be implemented. If not, then + * {@link #get(Object, long)} must not be called. + */ + boolean isVersioned(); + default void close() {} } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java index 96c103a39a4..ea635c3ebc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java @@ -89,6 +89,11 @@ public class KTableSuppressProcessorSupplier implements } } + @Override + public boolean isVersioned() { + return false; + } + @Override public void close() { // the main processor is responsible for the buffer's lifecycle diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index 9c2920570aa..9fb2dc2123f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -81,6 +81,14 @@ public class KeyValueStoreWrapper implements StateStore { throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); } + public ValueAndTimestamp get(final K key, final long asOfTimestamp) { + if (!isVersionedStore()) { + throw new UnsupportedOperationException("get(key, timestamp) is only supported for versioned stores"); + } + final VersionedRecord versionedRecord = versionedStore.get(key, asOfTimestamp); + return versionedRecord == null ? null : ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()); + } + public void put(final K key, final V value, final long timestamp) { if (timestampedStore != null) { timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp)); @@ -97,6 +105,10 @@ public class KeyValueStoreWrapper implements StateStore { return store; } + public boolean isVersionedStore() { + return versionedStore != null; + } + @Override public String name() { return store.name(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java index 1bf708b8ab5..6757ccca20b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java @@ -352,6 +352,11 @@ public class ForeignJoinSubscriptionProcessorSupplierTest { public void init(final ProcessorContext context) { } + + @Override + public boolean isVersioned() { + return false; + } }; return new KTableValueGetterSupplier() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java index dd794b0107d..afd5f490db2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java @@ -56,6 +56,11 @@ public class SubscriptionResolverJoinProcessorSupplierTest { public ValueAndTimestamp get(final K key) { return ValueAndTimestamp.make(map.get(key), -1); } + + @Override + public boolean isVersioned() { + return false; + } }; }