KAFKA-5876: KIP-216 Part 3, Apply StreamsNotStartedException for Interactive Queries (#10597)

KIP-216 Part 3: Throw StreamsNotStartedException if KafkaStreams state is CREATED

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Vito Jeng 2021-05-04 04:53:35 +08:00 committed by GitHub
parent 3ddc377b05
commit 816f5c3b86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 18 deletions

View File

@ -158,7 +158,7 @@
files="StreamThread.java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(KStreamImpl|KTableImpl).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
<suppress checks="CyclomaticComplexity"
files="(StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>

View File

@ -96,7 +96,14 @@
<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
A new exception may be thrown from <code>KafkaStreams#store()</code>. If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.
Interactive Queries may throw new exceptions for different errors:
</p>
<ul>
<li> <code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
<li> <code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
</ul>
<p>
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.
</p>
<p>
We deprecated the StreamsConfig <code>processing.guarantee</code> configuration value <code>"exactly_once"</code> (for EOS version 1) in favor of the improved EOS version 2, formerly configured via

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
@ -343,10 +344,16 @@ public class KafkaStreams implements AutoCloseable {
}
private void validateIsRunningOrRebalancing() {
if (!isRunningOrRebalancing()) {
throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
synchronized (stateLock) {
if (state == State.CREATED) {
throw new StreamsNotStartedException("KafkaStreams has not been started, you can retry after calling start()");
}
if (!isRunningOrRebalancing()) {
throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
}
}
}
/**
* Listen to {@link State} change events.
*/
@ -1516,6 +1523,8 @@ public class KafkaStreams implements AutoCloseable {
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
* @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just
* retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}.
* @throws UnknownStateStoreException If the specified store name does not exist in the topology.
* @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType,
@ -1524,6 +1533,7 @@ public class KafkaStreams implements AutoCloseable {
* an InvalidStateStoreException is thrown upon store access.
*/
public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
validateIsRunningOrRebalancing();
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
@ -1531,7 +1541,6 @@ public class KafkaStreams implements AutoCloseable {
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeQueryParameters);
}

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
@ -94,6 +95,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@ -116,6 +118,7 @@ public class KafkaStreamsTest {
private static final int NUM_THREADS = 2;
private final static String APPLICATION_ID = "appId";
private final static String CLIENT_ID = "test-client";
private final static Duration DEFAULT_DURATION = Duration.ofSeconds(30);
@Rule
public TestName testName = new TestName();
@ -717,21 +720,39 @@ public class KafkaStreamsTest {
}
@Test
public void shouldNotGetAllTasksWhenNotRunning() {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
assertThrows(IllegalStateException.class, streams::allMetadata);
public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, streams::allMetadata);
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, streams::allMetadata);
}
}
@Test
public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.allMetadataForStore("store"));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
}
}
@Test
public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
}
}
@Test
@ -742,19 +763,38 @@ public class KafkaStreamsTest {
}
@Test
public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
}
}
@Test
public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() {
public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore())));
}
}
@Test
public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("store", keyValueStore())));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("store", keyValueStore())));
}
}
@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
// Mock all calls made to compute the offset lags,