From fae0784ce32a448a0d8c69e0b55f957290df1b5a Mon Sep 17 00:00:00 2001 From: Vito Jeng Date: Tue, 11 May 2021 08:29:58 +0800 Subject: [PATCH] KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (#10657) KIP-216, part 4 - apply InvalidStateStorePartitionException Reviewers: Anna Sophie Blee-Goldman --- docs/streams/upgrade-guide.html | 1 + .../main/java/org/apache/kafka/streams/KafkaStreams.java | 2 ++ .../streams/state/internals/WrappingStoreProvider.java | 3 ++- .../state/internals/WrappingStoreProviderTest.java | 8 ++++++++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 6f13cc5d36c..de747bb862f 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -101,6 +101,7 @@

See KIP-216 for more information. diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index e8a7fa2c1a2..f3f8e8ea645 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.UnknownStateStoreException; +import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -1526,6 +1527,7 @@ public class KafkaStreams implements AutoCloseable { * @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just * retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}. * @throws UnknownStateStoreException If the specified store name does not exist in the topology. + * @throws InvalidStateStorePartitionException If the specified partition does not exist. * @throws InvalidStateStoreException If the Streams instance isn't in a queryable state. * If the store's type does not match the QueryableStoreType, * the Streams instance is not in a queryable state with respect diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 26c5db0e192..03ac0ae2c91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.state.QueryableStoreType; import java.util.ArrayList; @@ -57,7 +58,7 @@ public class WrappingStoreProvider implements StateStoreProvider { } if (allStores.isEmpty()) { if (storeQueryParameters.partition() != null) { - throw new InvalidStateStoreException( + throw new InvalidStateStorePartitionException( String.format("The specified partition %d for store %s does not exist.", storeQueryParameters.partition(), storeName)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index 0d364730dc3..2a5551a7c75 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.state.NoOpWindowStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -82,6 +83,13 @@ public class WrappingStoreProviderTest { assertThrows(InvalidStateStoreException.class, () -> wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.keyValueStore())); } + @Test + public void shouldThrowInvalidStoreExceptionIfNoPartitionFound() { + final int invalidPartition = numStateStorePartitions + 1; + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()).withPartition(invalidPartition)); + assertThrows(InvalidStateStorePartitionException.class, () -> wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore())); + } + @Test public void shouldReturnAllStoreWhenQueryWithoutPartition() { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()));