diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1c64a6f7b29..1093ac895aa 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -158,7 +158,7 @@
files="StreamThread.java"/>
+ files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 02f258c1d17..1990d721c81 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -96,7 +96,14 @@
- A new exception may be thrown from KafkaStreams#store()
. If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See KIP-216 for more information.
+ Interactive Queries may throw new exceptions for different errors:
+
+
+ -
UnknownStateStoreException
: If the specified store name does not exist in the topology, an UnknownStateStoreException
will be thrown instead of the former InvalidStateStoreException
.
+ -
StreamsNotStartedException
: If Streams state is CREATED
, a StreamsNotStartedException
will be thrown.
+
+
+ See KIP-216 for more information.
We deprecated the StreamsConfig processing.guarantee
configuration value "exactly_once"
(for EOS version 1) in favor of the improved EOS version 2, formerly configured via
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b7eda15dc8e..e8a7fa2c1a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
@@ -343,10 +344,16 @@ public class KafkaStreams implements AutoCloseable {
}
private void validateIsRunningOrRebalancing() {
- if (!isRunningOrRebalancing()) {
- throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
+ synchronized (stateLock) {
+ if (state == State.CREATED) {
+ throw new StreamsNotStartedException("KafkaStreams has not been started, you can retry after calling start()");
+ }
+ if (!isRunningOrRebalancing()) {
+ throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
+ }
}
}
+
/**
* Listen to {@link State} change events.
*/
@@ -1516,6 +1523,8 @@ public class KafkaStreams implements AutoCloseable {
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
+ * @throws StreamsNotStartedException If Streams state is {@link KafkaStreams.State#CREATED CREATED}. Just
+ * retry and wait until to {@link KafkaStreams.State#RUNNING RUNNING}.
* @throws UnknownStateStoreException If the specified store name does not exist in the topology.
* @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType,
@@ -1524,6 +1533,7 @@ public class KafkaStreams implements AutoCloseable {
* an InvalidStateStoreException is thrown upon store access.
*/
public T store(final StoreQueryParameters storeQueryParameters) {
+ validateIsRunningOrRebalancing();
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
@@ -1531,7 +1541,6 @@ public class KafkaStreams implements AutoCloseable {
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
- validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeQueryParameters);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 82e8d6b2425..63fa5e7e0c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
@@ -94,6 +95,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@@ -116,6 +118,7 @@ public class KafkaStreamsTest {
private static final int NUM_THREADS = 2;
private final static String APPLICATION_ID = "appId";
private final static String CLIENT_ID = "test-client";
+ private final static Duration DEFAULT_DURATION = Duration.ofSeconds(30);
@Rule
public TestName testName = new TestName();
@@ -717,21 +720,39 @@ public class KafkaStreamsTest {
}
@Test
- public void shouldNotGetAllTasksWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
- assertThrows(IllegalStateException.class, streams::allMetadata);
+ public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertThrows(StreamsNotStartedException.class, streams::allMetadata);
+ streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+ streams.close();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ assertThrows(IllegalStateException.class, streams::allMetadata);
+ }
}
@Test
- public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
- assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
+ public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertThrows(StreamsNotStartedException.class, () -> streams.allMetadataForStore("store"));
+ streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+ streams.close();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
+ }
}
@Test
- public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() {
- final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
- assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
+ public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
+ streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+ streams.close();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
+ }
}
@Test
@@ -742,19 +763,38 @@ public class KafkaStreamsTest {
}
@Test
- public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() {
- final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
- assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
+ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
+ streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+ streams.close();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
+ }
}
@Test
- public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() {
+ public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore())));
}
}
+ @Test
+ public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws InterruptedException {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertThrows(StreamsNotStartedException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("store", keyValueStore())));
+ streams.start();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+ streams.close();
+ waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ assertThrows(IllegalStateException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("store", keyValueStore())));
+ }
+ }
+
@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
// Mock all calls made to compute the offset lags,