From 57477886593a60cd92cc50b5722788f9ac1c3d7a Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 20 Dec 2021 12:22:05 -0600 Subject: [PATCH] KAFKA-13525: Implement KeyQuery in Streams IQv2 (#11582) Implement the KeyQuery as proposed in KIP-796 Reviewers: Vicky Papavasileiou , Matthias J. Sax , Guozhang Wang --- .../kafka/streams/query/FailureReason.java | 7 + .../apache/kafka/streams/query/KeyQuery.java | 52 ++++++ .../kafka/streams/query/QueryResult.java | 13 ++ .../kafka/streams/query/RangeQuery.java | 8 +- .../state/internals/MeteredKeyValueStore.java | 90 +++++++--- .../streams/state/internals/PingQuery.java | 29 ---- .../state/internals/StoreQueryUtils.java | 48 +++-- .../integration/IQv2IntegrationTest.java | 42 ++--- .../integration/IQv2StoreIntegrationTest.java | 164 +++++++++--------- 9 files changed, 276 insertions(+), 177 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java delete mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java index c250f1c7bdc..6f4dc67c85b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java @@ -19,6 +19,13 @@ package org.apache.kafka.streams.query; import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +/** + * This enumeration type captures the various top-level reasons that a particular + * partition of a store would fail to execute a query. Stores should generally + * respond with a failure message instead of throwing an exception. + *

+ * Intended to be used in {@link QueryResult#forFailure(FailureReason, String)}. + */ @Evolving public enum FailureReason { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java new file mode 100644 index 00000000000..66e1c02990a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.query; + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; + +import java.util.Objects; + +/** + * Interactive query for retrieving a single record based on its key. + */ +@Evolving +public final class KeyQuery implements Query { + + private final K key; + + private KeyQuery(final K key) { + this.key = Objects.requireNonNull(key); + } + + /** + * Creates a query that will retrieve the record identified by {@code key} if it exists + * (or {@code null} otherwise). + * @param key The key to retrieve + * @param The type of the key + * @param The type of the value that will be retrieved + */ + public static KeyQuery withKey(final K key) { + return new KeyQuery<>(key); + } + + /** + * The key that was specified for this query. + */ + public K getKey() { + return key; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java index caf44050fdf..1e04e5b6b49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java @@ -197,6 +197,19 @@ public final class QueryResult { return result; } + public QueryResult swapResult(final V value) { + if (isFailure()) { + throw new IllegalArgumentException( + "Callers must avoid calling this method on a failed result." + ); + } else { + final QueryResult result = new QueryResult<>(value); + result.executionInfo = executionInfo; + result.position = position; + return result; + } + } + @Override public String toString() { return "QueryResult{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java index 30c43b30d0c..45ec9dea6c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java +++ b/streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java @@ -34,7 +34,7 @@ import java.util.Optional; *

*/ @Evolving -public class RangeQuery implements Query> { +public final class RangeQuery implements Query> { private final Optional lower; @@ -52,7 +52,6 @@ public class RangeQuery implements Query> { * @param upper The key that specifies the upper bound of the range * @param The key type * @param The value type - * @return An iterator of records */ public static RangeQuery withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); @@ -63,7 +62,6 @@ public class RangeQuery implements Query> { * @param upper The key that specifies the upper bound of the range * @param The key type * @param The value type - * @return An iterator of records */ public static RangeQuery withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); @@ -74,7 +72,6 @@ public class RangeQuery implements Query> { * @param lower The key that specifies the lower bound of the range * @param The key type * @param The value type - * @return An iterator of records */ public static RangeQuery withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); @@ -84,7 +81,6 @@ public class RangeQuery implements Query> { * Interactive scan query that returns all records in the store. * @param The key type * @param The value type - * @return An iterator of records */ public static RangeQuery withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); @@ -92,7 +88,6 @@ public class RangeQuery implements Query> { /** * The lower bound of the query, if specified. - * @return The lower bound */ public Optional getLowerBound() { return lower; @@ -100,7 +95,6 @@ public class RangeQuery implements Query> { /** * The upper bound of the query, if specified - * @return The upper bound */ public Optional getUpperBound() { return upper; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 15a14aa0507..b8bacfac49a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; @@ -88,13 +89,18 @@ public class MeteredKeyValueStore private StreamsMetricsImpl streamsMetrics; private TaskId taskId; - private Map queryHandlers = - mkMap( - mkEntry( - RangeQuery.class, - (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo) - ) - ); + @SuppressWarnings("rawtypes") + private final Map queryHandlers = + mkMap( + mkEntry( + RangeQuery.class, + (query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo) + ), + mkEntry( + KeyQuery.class, + (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) + ) + ); MeteredKeyValueStore(final KeyValueStore inner, final String metricsScope, @@ -209,7 +215,7 @@ public class MeteredKeyValueStore final PositionBound positionBound, final boolean collectExecutionInfo) { - final long start = System.nanoTime(); + final long start = time.nanoseconds(); final QueryResult result; final QueryHandler handler = queryHandlers.get(query.getClass()); @@ -217,34 +223,37 @@ public class MeteredKeyValueStore result = wrapped().query(query, positionBound, collectExecutionInfo); if (collectExecutionInfo) { result.addExecutionInfo( - "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns"); + "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); } } else { result = (QueryResult) handler.apply( - query, - positionBound, - collectExecutionInfo, - this + query, + positionBound, + collectExecutionInfo, + this ); if (collectExecutionInfo) { result.addExecutionInfo( - "Handled in " + getClass() + " with serdes " - + serdes + " in " + (System.nanoTime() - start) + "ns"); + "Handled in " + getClass() + " with serdes " + + serdes + " in " + (time.nanoseconds() - start) + "ns"); } } return result; } @SuppressWarnings("unchecked") - private QueryResult runRangeQuery( - final Query query, final PositionBound positionBound, final boolean collectExecutionInfo) { + private QueryResult runRangeQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { final QueryResult result; final RangeQuery typedQuery = (RangeQuery) query; final RangeQuery rawRangeQuery; if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { - rawRangeQuery = RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()), - keyBytes(typedQuery.getUpperBound().get())); + rawRangeQuery = RangeQuery.withRange( + keyBytes(typedQuery.getLowerBound().get()), + keyBytes(typedQuery.getUpperBound().get()) + ); } else if (typedQuery.getLowerBound().isPresent()) { rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); } else if (typedQuery.getUpperBound().isPresent()) { @@ -253,12 +262,17 @@ public class MeteredKeyValueStore rawRangeQuery = RangeQuery.withNoBounds(); } final QueryResult> rawResult = - wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); + wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo); if (rawResult.isSuccess()) { final KeyValueIterator iterator = rawResult.getResult(); final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( - iterator, getSensor, getValueDeserializer()); - final QueryResult> typedQueryResult = QueryResult.forResult(resultIterator); + iterator, + getSensor, + getValueDeserializer() + ); + final QueryResult> typedQueryResult = rawResult.swapResult( + resultIterator + ); result = (QueryResult) typedQueryResult; } else { // the generic type doesn't matter, since failed queries have no result set. @@ -267,17 +281,41 @@ public class MeteredKeyValueStore return result; } + @SuppressWarnings("unchecked") + private QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo) { + final QueryResult result; + final KeyQuery typedKeyQuery = (KeyQuery) query; + final KeyQuery rawKeyQuery = + KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); + final QueryResult rawResult = + wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo); + if (rawResult.isSuccess()) { + final Deserializer deserializer = getValueDeserializer(); + final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult()); + final QueryResult typedQueryResult = + rawResult.swapResult(value); + result = (QueryResult) typedQueryResult; + } else { + // the generic type doesn't matter, since failed queries have no result set. + result = (QueryResult) rawResult; + } + return result; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) private Deserializer getValueDeserializer() { - final Serde vSerde = serdes.valueSerde(); + final Serde valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); final Deserializer deserializer; - if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { + if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { final ValueAndTimestampDeserializer valueAndTimestampDeserializer = - (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); + (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer(); deserializer = (Deserializer) valueAndTimestampDeserializer.valueDeserializer; } else { - deserializer = vSerde.deserializer(); + deserializer = valueSerde.deserializer(); } return deserializer; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java deleted file mode 100644 index 1eaf12847af..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PingQuery.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.streams.query.Query; - -/** - * A very simple query that all stores can handle to verify that the store is participating in the - * IQv2 framework properly. - *

- * This is not a public API and may change without notice. - */ -public class PingQuery implements Query { - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index aeb25393da8..e1d20c9e6d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -54,22 +55,23 @@ public final class StoreQueryUtils { ); } - private static final Map, QueryHandler> QUERY_HANDLER_MAP = + @SuppressWarnings("rawtypes") + private static final Map QUERY_HANDLER_MAP = mkMap( - mkEntry( - PingQuery.class, - (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) - ), mkEntry( RangeQuery.class, StoreQueryUtils::runRangeQuery + ), + mkEntry( + KeyQuery.class, + StoreQueryUtils::runKeyQuery ) ); // make this class uninstantiable + private StoreQueryUtils() { } - @SuppressWarnings("unchecked") public static QueryResult handleBasicQueries( final Query query, @@ -153,7 +155,7 @@ public final class StoreQueryUtils { final RangeQuery rangeQuery = (RangeQuery) query; final Optional lowerRange = rangeQuery.getLowerBound(); final Optional upperRange = rangeQuery.getUpperBound(); - KeyValueIterator iterator = null; + final KeyValueIterator iterator; try { if (!lowerRange.isPresent() && !upperRange.isPresent()) { iterator = kvStore.all(); @@ -162,8 +164,8 @@ public final class StoreQueryUtils { } final R result = (R) iterator; return QueryResult.forResult(result); - } catch (final Throwable t) { - final String message = parseStoreException(t, store, query); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); return QueryResult.forFailure( FailureReason.STORE_EXCEPTION, message @@ -171,12 +173,36 @@ public final class StoreQueryUtils { } } - private static String parseStoreException(final Throwable t, final StateStore store, final Query query) { + @SuppressWarnings("unchecked") + private static QueryResult runKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + if (store instanceof KeyValueStore) { + final KeyQuery rawKeyQuery = (KeyQuery) query; + final KeyValueStore keyValueStore = + (KeyValueStore) store; + try { + final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); + return (QueryResult) QueryResult.forResult(bytes); + } catch (final Exception e) { + final String message = parseStoreException(e, store, query); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else { + return QueryResult.forUnknownQueryType(query, store); + } + } + + private static String parseStoreException(final Exception e, final StateStore store, final Query query) { final StringWriter stringWriter = new StringWriter(); final PrintWriter printWriter = new PrintWriter(stringWriter); printWriter.println( store.getClass() + " failed to handle query " + query + ":"); - t.printStackTrace(printWriter); + e.printStackTrace(printWriter); printWriter.flush(); return stringWriter.toString(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 4b40a55a881..7c87062b102 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -43,6 +43,7 @@ import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamThread.State; import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.StateQueryRequest; @@ -50,7 +51,7 @@ import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.PingQuery; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -178,8 +179,8 @@ public class IQv2IntegrationTest { @Test public void shouldFailUnknownStore() { - final PingQuery query = new PingQuery(); - final StateQueryRequest request = + final KeyQuery> query = KeyQuery.withKey(1); + final StateQueryRequest> request = inStore("unknown-store").withQuery(query); assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request)); @@ -187,8 +188,8 @@ public class IQv2IntegrationTest { @Test public void shouldFailNotStarted() { - final PingQuery query = new PingQuery(); - final StateQueryRequest request = + final KeyQuery> query = KeyQuery.withKey(1); + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query); assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request)); @@ -196,8 +197,8 @@ public class IQv2IntegrationTest { @Test public void shouldFailStopped() { - final PingQuery query = new PingQuery(); - final StateQueryRequest request = + final KeyQuery> query = KeyQuery.withKey(1); + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query); kafkaStreams.start(); @@ -208,8 +209,8 @@ public class IQv2IntegrationTest { @Test public void shouldRejectNonRunningActive() throws NoSuchFieldException, IllegalAccessException { - final PingQuery query = new PingQuery(); - final StateQueryRequest request = + final KeyQuery> query = KeyQuery.withKey(1); + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query).requireActive(); final Set partitions = mkSet(0, 1); @@ -238,7 +239,7 @@ public class IQv2IntegrationTest { stateField.setAccessible(true); stateField.set(streamThread, State.PARTITIONS_ASSIGNED); - final StateQueryResult result = + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForPartitions( kafkaStreams, request, @@ -263,14 +264,14 @@ public class IQv2IntegrationTest { @Test public void shouldFetchFromPartition() { - final PingQuery query = new PingQuery(); + final KeyQuery> query = KeyQuery.withKey(1); final int partition = 1; final Set partitions = singleton(partition); - final StateQueryRequest request = + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query).withPartitions(partitions); kafkaStreams.start(); - final StateQueryResult result = + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); @@ -278,13 +279,13 @@ public class IQv2IntegrationTest { @Test public void shouldFetchExplicitlyFromAllPartitions() { - final PingQuery query = new PingQuery(); + final KeyQuery> query = KeyQuery.withKey(1); final Set partitions = mkSet(0, 1); - final StateQueryRequest request = + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query).withAllPartitions(); kafkaStreams.start(); - final StateQueryResult result = + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions); assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); @@ -292,10 +293,10 @@ public class IQv2IntegrationTest { @Test public void shouldNotRequireQueryHandler() { - final PingQuery query = new PingQuery(); + final KeyQuery> query = KeyQuery.withKey(1); final int partition = 1; final Set partitions = singleton(partition); - final StateQueryRequest request = + final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query).withPartitions(partitions); final StreamsBuilder builder = new StreamsBuilder(); @@ -411,10 +412,11 @@ public class IQv2IntegrationTest { kafkaStreams.cleanUp(); kafkaStreams.start(); - final StateQueryResult result = + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); - final QueryResult queryResult = result.getPartitionResults().get(partition); + final QueryResult> queryResult = + result.getPartitionResults().get(partition); assertThat(queryResult.isFailure(), is(true)); assertThat(queryResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE)); assertThat(queryResult.getFailureMessage(), matchesPattern( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 06d8a4f5800..cfb3860fc61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.query.FailureReason; +import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; @@ -53,7 +54,6 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.internals.PingQuery; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -93,6 +93,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertThrows; @Category({IntegrationTest.class}) @@ -303,7 +304,6 @@ public class IQv2StoreIntegrationTest { final boolean cache, final boolean log, final String storeToTest) { - this.cache = cache; this.log = log; this.storeToTest = StoresToTest.valueOf(storeToTest); @@ -386,17 +386,17 @@ public class IQv2StoreIntegrationTest { } if (storeToTest.global()) { - builder - .globalTable( - INPUT_TOPIC_NAME, - Consumed.with(Serdes.Integer(), Serdes.Integer()), - materialized); + builder.globalTable( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + materialized + ); } else { - builder - .table( - INPUT_TOPIC_NAME, - Consumed.with(Serdes.Integer(), Serdes.Integer()), - materialized); + builder.table( + INPUT_TOPIC_NAME, + Consumed.with(Serdes.Integer(), Serdes.Integer()), + materialized + ); } } else if (supplier instanceof WindowBytesStoreSupplier) { final Materialized> materialized = @@ -414,13 +414,15 @@ public class IQv2StoreIntegrationTest { materialized.withCachingDisabled(); } - builder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())) + builder + .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())) .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)) .aggregate( () -> 0, (key, value, aggregate) -> aggregate + value, - materialized); + materialized + ); } else if (supplier instanceof SessionBytesStoreSupplier) { final Materialized> materialized = Materialized.as((SessionBytesStoreSupplier) supplier); @@ -437,14 +439,16 @@ public class IQv2StoreIntegrationTest { materialized.withCachingDisabled(); } - builder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())) + builder + .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())) .groupByKey() .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE)) .aggregate( () -> 0, (key, value, aggregate) -> aggregate + value, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, - materialized); + materialized + ); } else { throw new AssertionError("Store supplier is an unrecognized type."); } @@ -478,15 +482,19 @@ public class IQv2StoreIntegrationTest { globalShouldRejectAllQueries(); } else { shouldRejectUnknownQuery(); - shouldHandlePingQuery(); shouldCollectExecutionInfo(); shouldCollectExecutionInfoUnderFailure(); if (storeToTest.keyValue()) { if (storeToTest.timestamped()) { - shouldHandleRangeQueries((Function, Integer>) ValueAndTimestamp::value); + final Function, Integer> valueExtractor = + ValueAndTimestamp::value; + shouldHandleKeyQuery(2, valueExtractor, 2); + shouldHandleRangeQueries(valueExtractor); } else { - shouldHandleRangeQueries(Function.identity()); + final Function valueExtractor = Function.identity(); + shouldHandleKeyQuery(2, valueExtractor, 2); + shouldHandleRangeQueries(valueExtractor); } } } @@ -527,10 +535,11 @@ public class IQv2StoreIntegrationTest { private void globalShouldRejectAllQueries() { // See KAFKA-13523 - final PingQuery query = new PingQuery(); - final StateQueryRequest request = inStore(STORE_NAME).withQuery(query); + final KeyQuery> query = KeyQuery.withKey(1); + final StateQueryRequest> request = + inStore(STORE_NAME).withQuery(query); - final StateQueryResult result = kafkaStreams.query(request); + final StateQueryResult> result = kafkaStreams.query(request); assertThat(result.getGlobalResult().isFailure(), is(true)); assertThat(result.getGlobalResult().getFailureReason(), @@ -572,42 +581,6 @@ public class IQv2StoreIntegrationTest { ); } - public void shouldHandlePingQuery() { - - final PingQuery query = new PingQuery(); - final Set partitions = mkSet(0, 1); - final StateQueryRequest request = - inStore(STORE_NAME) - .withQuery(query) - .withPartitions(partitions) - .withPositionBound(PositionBound.at(INPUT_POSITION)); - - final StateQueryResult result = - IntegrationTestUtils.iqv2WaitForResult( - kafkaStreams, - request - ); - - makeAssertions( - partitions, - result, - queryResult -> { - final boolean failure = queryResult.isFailure(); - if (failure) { - assertThat(queryResult.toString(), failure, is(false)); - } - assertThat(queryResult.isSuccess(), is(true)); - - assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); - assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage); - - assertThat(queryResult.getResult(), is(true)); - - assertThat(queryResult.getExecutionInfo(), is(empty())); - }); - assertThat(result.getPosition(), is(INPUT_POSITION)); - } - public void shouldHandleRangeQuery( final Optional lower, final Optional upper, @@ -630,29 +603,11 @@ public class IQv2StoreIntegrationTest { .withQuery(query) .withPartitions(mkSet(0, 1)) .withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { - final QueryResult> queryResult = result.getGlobalResult(); - final boolean failure = queryResult.isFailure(); - if (failure) { - throw new AssertionError(queryResult.toString()); - } - assertThat(queryResult.isSuccess(), is(true)); - - assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); - assertThrows(IllegalArgumentException.class, - queryResult::getFailureMessage); - - final KeyValueIterator iterator = queryResult.getResult(); - final Set actualValue = new HashSet<>(); - while (iterator.hasNext()) { - actualValue.add(valueExtactor.apply(iterator.next().value)); - } - assertThat(actualValue, is(expectedValue)); - assertThat(queryResult.getExecutionInfo(), is(empty())); + fail("global tables aren't implemented"); } else { final Set actualValue = new HashSet<>(); final Map>> queryResult = result.getPartitionResults(); @@ -663,11 +618,17 @@ public class IQv2StoreIntegrationTest { } assertThat(queryResult.get(partition).isSuccess(), is(true)); - assertThrows(IllegalArgumentException.class, queryResult.get(partition)::getFailureReason); - assertThrows(IllegalArgumentException.class, - queryResult.get(partition)::getFailureMessage); + assertThrows( + IllegalArgumentException.class, + queryResult.get(partition)::getFailureReason + ); + assertThrows( + IllegalArgumentException.class, + queryResult.get(partition)::getFailureMessage + ); - final KeyValueIterator iterator = queryResult.get(partition).getResult(); + final KeyValueIterator iterator = queryResult.get(partition) + .getResult(); while (iterator.hasNext()) { actualValue.add(valueExtactor.apply(iterator.next().value)); } @@ -677,18 +638,53 @@ public class IQv2StoreIntegrationTest { } } + public void shouldHandleKeyQuery( + final Integer key, + final Function valueExtactor, + final Integer expectedValue) { + + final KeyQuery query = KeyQuery.withKey(key); + final StateQueryRequest request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + + final StateQueryResult result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + final QueryResult queryResult = result.getOnlyPartitionResult(); + final boolean failure = queryResult.isFailure(); + if (failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.isSuccess(), is(true)); + + assertThrows(IllegalArgumentException.class, queryResult::getFailureReason); + assertThrows( + IllegalArgumentException.class, + queryResult::getFailureMessage + ); + + final V result1 = queryResult.getResult(); + final Integer integer = valueExtactor.apply(result1); + assertThat(integer, is(expectedValue)); + + assertThat(queryResult.getExecutionInfo(), is(empty())); + } + public void shouldCollectExecutionInfo() { - final PingQuery query = new PingQuery(); + final KeyQuery> query = KeyQuery.withKey(1); final Set partitions = mkSet(0, 1); - final StateQueryRequest request = + final StateQueryRequest> request = inStore(STORE_NAME) .withQuery(query) .enableExecutionInfo() .withPartitions(partitions) .withPositionBound(PositionBound.at(INPUT_POSITION)); - final StateQueryResult result = + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult( kafkaStreams, request