KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (#10657)

KIP-216, part 4 - apply InvalidStateStorePartitionException

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Vito Jeng 2021-05-11 08:29:58 +08:00 committed by GitHub
parent 25f4ee879c
commit fae0784ce3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 13 additions and 1 deletions

View File

@ -101,6 +101,7 @@
<ul> <ul>
<li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li> <li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
<li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li> <li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
<li> <code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
</ul> </ul>
<p> <p>
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.

View File

@ -46,6 +46,7 @@ import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException; import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateRestoreListener;
@ -1526,6 +1527,7 @@ public class KafkaStreams implements AutoCloseable {
* @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just * @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just
* retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}. * retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}.
* @throws UnknownStateStoreException If the specified store name does not exist in the topology. * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
* @throws InvalidStateStorePartitionException If the specified partition does not exist.
* @throws InvalidStateStoreException If the Streams instance isn't in a queryable state. * @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType, * If the store's type does not match the QueryableStoreType,
* the Streams instance is not in a queryable state with respect * the Streams instance is not in a queryable state with respect

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList; import java.util.ArrayList;
@ -57,7 +58,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
} }
if (allStores.isEmpty()) { if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) { if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException( throw new InvalidStateStorePartitionException(
String.format("The specified partition %d for store %s does not exist.", String.format("The specified partition %d for store %s does not exist.",
storeQueryParameters.partition(), storeQueryParameters.partition(),
storeName)); storeName));

View File

@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.state.NoOpWindowStore; import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -82,6 +83,13 @@ public class WrappingStoreProviderTest {
assertThrows(InvalidStateStoreException.class, () -> wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore())); assertThrows(InvalidStateStoreException.class, () -> wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
} }
@Test
public void shouldThrowInvalidStoreExceptionIfNoPartitionFound() {
final int invalidPartition = numStateStorePartitions + 1;
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(invalidPartition));
assertThrows(InvalidStateStorePartitionException.class, () -> wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore()));
}
@Test @Test
public void shouldReturnAllStoreWhenQueryWithoutPartition() { public void shouldReturnAllStoreWhenQueryWithoutPartition() {
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore())); wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));