KAFKA-13525: Implement KeyQuery in Streams IQv2 (#11582)

Implement the KeyQuery as proposed in KIP-796

Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
This commit is contained in:
John Roesler 2021-12-20 12:22:05 -06:00 committed by GitHub
parent e82df83968
commit 5747788659
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 276 additions and 177 deletions

View File

@ -19,6 +19,13 @@ package org.apache.kafka.streams.query;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
/**
* This enumeration type captures the various top-level reasons that a particular
* partition of a store would fail to execute a query. Stores should generally
* respond with a failure message instead of throwing an exception.
* <p>
* Intended to be used in {@link QueryResult#forFailure(FailureReason, String)}.
*/
@Evolving
public enum FailureReason {
/**

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.query;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import java.util.Objects;
/**
* Interactive query for retrieving a single record based on its key.
*/
@Evolving
public final class KeyQuery<K, V> implements Query<V> {
private final K key;
private KeyQuery(final K key) {
this.key = Objects.requireNonNull(key);
}
/**
* Creates a query that will retrieve the record identified by {@code key} if it exists
* (or {@code null} otherwise).
* @param key The key to retrieve
* @param <K> The type of the key
* @param <V> The type of the value that will be retrieved
*/
public static <K, V> KeyQuery<K, V> withKey(final K key) {
return new KeyQuery<>(key);
}
/**
* The key that was specified for this query.
*/
public K getKey() {
return key;
}
}

View File

@ -197,6 +197,19 @@ public final class QueryResult<R> {
return result;
}
public <V> QueryResult<V> swapResult(final V value) {
if (isFailure()) {
throw new IllegalArgumentException(
"Callers must avoid calling this method on a failed result."
);
} else {
final QueryResult<V> result = new QueryResult<>(value);
result.executionInfo = executionInfo;
result.position = position;
return result;
}
}
@Override
public String toString() {
return "QueryResult{" +

View File

@ -34,7 +34,7 @@ import java.util.Optional;
* <p>
*/
@Evolving
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
private final Optional<K> lower;
@ -52,7 +52,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param upper The key that specifies the upper bound of the range
* @param <K> The key type
* @param <V> The value type
* @return An iterator of records
*/
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
@ -63,7 +62,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param upper The key that specifies the upper bound of the range
* @param <K> The key type
* @param <V> The value type
* @return An iterator of records
*/
public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
return new RangeQuery<>(Optional.empty(), Optional.of(upper));
@ -74,7 +72,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* @param lower The key that specifies the lower bound of the range
* @param <K> The key type
* @param <V> The value type
* @return An iterator of records
*/
public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
return new RangeQuery<>(Optional.of(lower), Optional.empty());
@ -84,7 +81,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
* Interactive scan query that returns all records in the store.
* @param <K> The key type
* @param <V> The value type
* @return An iterator of records
*/
public static <K, V> RangeQuery<K, V> withNoBounds() {
return new RangeQuery<>(Optional.empty(), Optional.empty());
@ -92,7 +88,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
/**
* The lower bound of the query, if specified.
* @return The lower bound
*/
public Optional<K> getLowerBound() {
return lower;
@ -100,7 +95,6 @@ public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
/**
* The upper bound of the query, if specified
* @return The upper bound
*/
public Optional<K> getUpperBound() {
return upper;

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
@ -88,13 +89,18 @@ public class MeteredKeyValueStore<K, V>
private StreamsMetricsImpl streamsMetrics;
private TaskId taskId;
private Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
(query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo)
)
);
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
(query, positionBound, collectExecutionInfo, store) -> runRangeQuery(query, positionBound, collectExecutionInfo)
),
mkEntry(
KeyQuery.class,
(query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo)
)
);
MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
@ -209,7 +215,7 @@ public class MeteredKeyValueStore<K, V>
final PositionBound positionBound,
final boolean collectExecutionInfo) {
final long start = System.nanoTime();
final long start = time.nanoseconds();
final QueryResult<R> result;
final QueryHandler handler = queryHandlers.get(query.getClass());
@ -217,34 +223,37 @@ public class MeteredKeyValueStore<K, V>
result = wrapped().query(query, positionBound, collectExecutionInfo);
if (collectExecutionInfo) {
result.addExecutionInfo(
"Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
}
} else {
result = (QueryResult<R>) handler.apply(
query,
positionBound,
collectExecutionInfo,
this
query,
positionBound,
collectExecutionInfo,
this
);
if (collectExecutionInfo) {
result.addExecutionInfo(
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (System.nanoTime() - start) + "ns");
"Handled in " + getClass() + " with serdes "
+ serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runRangeQuery(
final Query<R> query, final PositionBound positionBound, final boolean collectExecutionInfo) {
private <R> QueryResult<R> runRangeQuery(final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo) {
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
final RangeQuery<Bytes, byte[]> rawRangeQuery;
if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) {
rawRangeQuery = RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()),
keyBytes(typedQuery.getUpperBound().get()));
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.getLowerBound().get()),
keyBytes(typedQuery.getUpperBound().get())
);
} else if (typedQuery.getLowerBound().isPresent()) {
rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
} else if (typedQuery.getUpperBound().isPresent()) {
@ -253,12 +262,17 @@ public class MeteredKeyValueStore<K, V>
rawRangeQuery = RangeQuery.withNoBounds();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
wrapped().query(rawRangeQuery, positionBound, collectExecutionInfo);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator = rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
iterator, getSensor, getValueDeserializer());
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = QueryResult.forResult(resultIterator);
iterator,
getSensor,
getValueDeserializer()
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult(
resultIterator
);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
@ -267,17 +281,41 @@ public class MeteredKeyValueStore<K, V>
return result;
}
@SuppressWarnings("unchecked")
private <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo) {
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
if (rawResult.isSuccess()) {
final Deserializer<V> deserializer = getValueDeserializer();
final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
final QueryResult<V> typedQueryResult =
rawResult.swapResult(value);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
result = (QueryResult<R>) rawResult;
}
return result;
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Deserializer<V> getValueDeserializer() {
final Serde<V> vSerde = serdes.valueSerde();
final Serde<V> valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
final Deserializer<V> deserializer;
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer();
deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
} else {
deserializer = vSerde.deserializer();
deserializer = valueSerde.deserializer();
}
return deserializer;
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.query.Query;
/**
* A very simple query that all stores can handle to verify that the store is participating in the
* IQv2 framework properly.
* <p>
* This is not a public API and may change without notice.
*/
public class PingQuery implements Query<Boolean> {
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@ -54,22 +55,23 @@ public final class StoreQueryUtils {
);
}
private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
@SuppressWarnings("rawtypes")
private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
mkMap(
mkEntry(
PingQuery.class,
(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
),
mkEntry(
RangeQuery.class,
StoreQueryUtils::runRangeQuery
),
mkEntry(
KeyQuery.class,
StoreQueryUtils::runKeyQuery
)
);
// make this class uninstantiable
private StoreQueryUtils() {
}
@SuppressWarnings("unchecked")
public static <R> QueryResult<R> handleBasicQueries(
final Query<R> query,
@ -153,7 +155,7 @@ public final class StoreQueryUtils {
final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, byte[]>) query;
final Optional<Bytes> lowerRange = rangeQuery.getLowerBound();
final Optional<Bytes> upperRange = rangeQuery.getUpperBound();
KeyValueIterator<Bytes, byte[]> iterator = null;
final KeyValueIterator<Bytes, byte[]> iterator;
try {
if (!lowerRange.isPresent() && !upperRange.isPresent()) {
iterator = kvStore.all();
@ -162,8 +164,8 @@ public final class StoreQueryUtils {
}
final R result = (R) iterator;
return QueryResult.forResult(result);
} catch (final Throwable t) {
final String message = parseStoreException(t, store, query);
} catch (final Exception e) {
final String message = parseStoreException(e, store, query);
return QueryResult.forFailure(
FailureReason.STORE_EXCEPTION,
message
@ -171,12 +173,36 @@ public final class StoreQueryUtils {
}
}
private static <R> String parseStoreException(final Throwable t, final StateStore store, final Query<R> query) {
@SuppressWarnings("unchecked")
private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo,
final StateStore store) {
if (store instanceof KeyValueStore) {
final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, byte[]>) query;
final KeyValueStore<Bytes, byte[]> keyValueStore =
(KeyValueStore<Bytes, byte[]>) store;
try {
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
return (QueryResult<R>) QueryResult.forResult(bytes);
} catch (final Exception e) {
final String message = parseStoreException(e, store, query);
return QueryResult.forFailure(
FailureReason.STORE_EXCEPTION,
message
);
}
} else {
return QueryResult.forUnknownQueryType(query, store);
}
}
private static <R> String parseStoreException(final Exception e, final StateStore store, final Query<R> query) {
final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter);
printWriter.println(
store.getClass() + " failed to handle query " + query + ":");
t.printStackTrace(printWriter);
e.printStackTrace(printWriter);
printWriter.flush();
return stringWriter.toString();
}

View File

@ -43,6 +43,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
@ -50,7 +51,7 @@ import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.PingQuery;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -178,8 +179,8 @@ public class IQv2IntegrationTest {
@Test
public void shouldFailUnknownStore() {
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request =
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore("unknown-store").withQuery(query);
assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
@ -187,8 +188,8 @@ public class IQv2IntegrationTest {
@Test
public void shouldFailNotStarted() {
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request =
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
@ -196,8 +197,8 @@ public class IQv2IntegrationTest {
@Test
public void shouldFailStopped() {
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request =
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
kafkaStreams.start();
@ -208,8 +209,8 @@ public class IQv2IntegrationTest {
@Test
public void shouldRejectNonRunningActive()
throws NoSuchFieldException, IllegalAccessException {
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request =
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).requireActive();
final Set<Integer> partitions = mkSet(0, 1);
@ -238,7 +239,7 @@ public class IQv2IntegrationTest {
stateField.setAccessible(true);
stateField.set(streamThread, State.PARTITIONS_ASSIGNED);
final StateQueryResult<Boolean> result =
final StateQueryResult<ValueAndTimestamp<Integer>> result =
IntegrationTestUtils.iqv2WaitForPartitions(
kafkaStreams,
request,
@ -263,14 +264,14 @@ public class IQv2IntegrationTest {
@Test
public void shouldFetchFromPartition() {
final PingQuery query = new PingQuery();
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
final StateQueryRequest<Boolean> request =
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
kafkaStreams.start();
final StateQueryResult<Boolean> result =
final StateQueryResult<ValueAndTimestamp<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
@ -278,13 +279,13 @@ public class IQv2IntegrationTest {
@Test
public void shouldFetchExplicitlyFromAllPartitions() {
final PingQuery query = new PingQuery();
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final Set<Integer> partitions = mkSet(0, 1);
final StateQueryRequest<Boolean> request =
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).withAllPartitions();
kafkaStreams.start();
final StateQueryResult<Boolean> result =
final StateQueryResult<ValueAndTimestamp<Integer>> result =
IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions);
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
@ -292,10 +293,10 @@ public class IQv2IntegrationTest {
@Test
public void shouldNotRequireQueryHandler() {
final PingQuery query = new PingQuery();
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
final StateQueryRequest<Boolean> request =
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).withPartitions(partitions);
final StreamsBuilder builder = new StreamsBuilder();
@ -411,10 +412,11 @@ public class IQv2IntegrationTest {
kafkaStreams.cleanUp();
kafkaStreams.start();
final StateQueryResult<Boolean> result =
final StateQueryResult<ValueAndTimestamp<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final QueryResult<Boolean> queryResult = result.getPartitionResults().get(partition);
final QueryResult<ValueAndTimestamp<Integer>> queryResult =
result.getPartitionResults().get(partition);
assertThat(queryResult.isFailure(), is(true));
assertThat(queryResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(queryResult.getFailureMessage(), matchesPattern(

View File

@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@ -53,7 +54,6 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.PingQuery;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -93,6 +93,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Category({IntegrationTest.class})
@ -303,7 +304,6 @@ public class IQv2StoreIntegrationTest {
final boolean cache,
final boolean log,
final String storeToTest) {
this.cache = cache;
this.log = log;
this.storeToTest = StoresToTest.valueOf(storeToTest);
@ -386,17 +386,17 @@ public class IQv2StoreIntegrationTest {
}
if (storeToTest.global()) {
builder
.globalTable(
INPUT_TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()),
materialized);
builder.globalTable(
INPUT_TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()),
materialized
);
} else {
builder
.table(
INPUT_TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()),
materialized);
builder.table(
INPUT_TOPIC_NAME,
Consumed.with(Serdes.Integer(), Serdes.Integer()),
materialized
);
}
} else if (supplier instanceof WindowBytesStoreSupplier) {
final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
@ -414,13 +414,15 @@ public class IQv2StoreIntegrationTest {
materialized.withCachingDisabled();
}
builder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
builder
.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value,
materialized);
materialized
);
} else if (supplier instanceof SessionBytesStoreSupplier) {
final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
Materialized.as((SessionBytesStoreSupplier) supplier);
@ -437,14 +439,16 @@ public class IQv2StoreIntegrationTest {
materialized.withCachingDisabled();
}
builder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
builder
.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + value,
(aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
materialized);
materialized
);
} else {
throw new AssertionError("Store supplier is an unrecognized type.");
}
@ -478,15 +482,19 @@ public class IQv2StoreIntegrationTest {
globalShouldRejectAllQueries();
} else {
shouldRejectUnknownQuery();
shouldHandlePingQuery();
shouldCollectExecutionInfo();
shouldCollectExecutionInfoUnderFailure();
if (storeToTest.keyValue()) {
if (storeToTest.timestamped()) {
shouldHandleRangeQueries((Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value);
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value;
shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleRangeQueries(valueExtractor);
} else {
shouldHandleRangeQueries(Function.identity());
final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleRangeQueries(valueExtractor);
}
}
}
@ -527,10 +535,11 @@ public class IQv2StoreIntegrationTest {
private void globalShouldRejectAllQueries() {
// See KAFKA-13523
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
final StateQueryResult<Boolean> result = kafkaStreams.query(request);
final StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(request);
assertThat(result.getGlobalResult().isFailure(), is(true));
assertThat(result.getGlobalResult().getFailureReason(),
@ -572,42 +581,6 @@ public class IQv2StoreIntegrationTest {
);
}
public void shouldHandlePingQuery() {
final PingQuery query = new PingQuery();
final Set<Integer> partitions = mkSet(0, 1);
final StateQueryRequest<Boolean> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(partitions)
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
IntegrationTestUtils.iqv2WaitForResult(
kafkaStreams,
request
);
makeAssertions(
partitions,
result,
queryResult -> {
final boolean failure = queryResult.isFailure();
if (failure) {
assertThat(queryResult.toString(), failure, is(false));
}
assertThat(queryResult.isSuccess(), is(true));
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
assertThat(queryResult.getResult(), is(true));
assertThat(queryResult.getExecutionInfo(), is(empty()));
});
assertThat(result.getPosition(), is(INPUT_POSITION));
}
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
final Optional<Integer> upper,
@ -630,29 +603,11 @@ public class IQv2StoreIntegrationTest {
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Integer, V>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
final QueryResult<KeyValueIterator<Integer, V>> queryResult = result.getGlobalResult();
final boolean failure = queryResult.isFailure();
if (failure) {
throw new AssertionError(queryResult.toString());
}
assertThat(queryResult.isSuccess(), is(true));
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows(IllegalArgumentException.class,
queryResult::getFailureMessage);
final KeyValueIterator<Integer, V> iterator = queryResult.getResult();
final Set<Integer> actualValue = new HashSet<>();
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
}
assertThat(actualValue, is(expectedValue));
assertThat(queryResult.getExecutionInfo(), is(empty()));
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValue = new HashSet<>();
final Map<Integer, QueryResult<KeyValueIterator<Integer, V>>> queryResult = result.getPartitionResults();
@ -663,11 +618,17 @@ public class IQv2StoreIntegrationTest {
}
assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows(IllegalArgumentException.class, queryResult.get(partition)::getFailureReason);
assertThrows(IllegalArgumentException.class,
queryResult.get(partition)::getFailureMessage);
assertThrows(
IllegalArgumentException.class,
queryResult.get(partition)::getFailureReason
);
assertThrows(
IllegalArgumentException.class,
queryResult.get(partition)::getFailureMessage
);
final KeyValueIterator<Integer, V> iterator = queryResult.get(partition).getResult();
final KeyValueIterator<Integer, V> iterator = queryResult.get(partition)
.getResult();
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
}
@ -677,18 +638,53 @@ public class IQv2StoreIntegrationTest {
}
}
public <V> void shouldHandleKeyQuery(
final Integer key,
final Function<V, Integer> valueExtactor,
final Integer expectedValue) {
final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
final StateQueryRequest<V> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<V> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final QueryResult<V> queryResult = result.getOnlyPartitionResult();
final boolean failure = queryResult.isFailure();
if (failure) {
throw new AssertionError(queryResult.toString());
}
assertThat(queryResult.isSuccess(), is(true));
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows(
IllegalArgumentException.class,
queryResult::getFailureMessage
);
final V result1 = queryResult.getResult();
final Integer integer = valueExtactor.apply(result1);
assertThat(integer, is(expectedValue));
assertThat(queryResult.getExecutionInfo(), is(empty()));
}
public void shouldCollectExecutionInfo() {
final PingQuery query = new PingQuery();
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final Set<Integer> partitions = mkSet(0, 1);
final StateQueryRequest<Boolean> request =
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME)
.withQuery(query)
.enableExecutionInfo()
.withPartitions(partitions)
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
final StateQueryResult<ValueAndTimestamp<Integer>> result =
IntegrationTestUtils.iqv2WaitForResult(
kafkaStreams,
request