allMetadata() {
+ validateIsRunningOrRebalancing();
+ return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+ new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+ streamsMetadata.stateStoreNames(),
+ streamsMetadata.topicPartitions(),
+ streamsMetadata.standbyStateStoreNames(),
+ streamsMetadata.standbyTopicPartitions()))
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
+ * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ *
+ * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+ */
+ public Collection metadataForAllStreamsClients() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
}
@@ -1469,8 +1489,36 @@ public class KafkaStreams implements AutoCloseable {
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
+ * @deprecated since 3.0.0 use {@link KafkaStreams#streamsMetadataForStore} instead
*/
- public Collection allMetadataForStore(final String storeName) {
+ @Deprecated
+ public Collection allMetadataForStore(final String storeName) {
+ validateIsRunningOrRebalancing();
+ return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
+ new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+ streamsMetadata.stateStoreNames(),
+ streamsMetadata.topicPartitions(),
+ streamsMetadata.standbyStateStoreNames(),
+ streamsMetadata.standbyTopicPartitions()))
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
+ *
+ * - use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
+ * instances that belong to the same Kafka Streams application)
+ * - and that contain a {@link StateStore} with the given {@code storeName}
+ *
+ * and return {@link StreamsMetadata} for each discovered instance.
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ *
+ * @param storeName the {@code storeName} to find metadata for
+ * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
+ * this application
+ */
+ public Collection streamsMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
}
@@ -1549,12 +1597,45 @@ public class KafkaStreams implements AutoCloseable {
for (final StreamThread thread : copy) consumer.accept(thread);
}
+ /**
+ * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+ *
+ * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+ * @deprecated since 3.0 use {@link #metadataForLocalThreads()}
+ */
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public Set localThreadsMetadata() {
+ return metadataForLocalThreads().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+ threadMetadata.threadName(),
+ threadMetadata.threadState(),
+ threadMetadata.consumerClientId(),
+ threadMetadata.restoreConsumerClientId(),
+ threadMetadata.producerClientIds(),
+ threadMetadata.adminClientId(),
+ threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+ taskMetadata.taskId().toString(),
+ taskMetadata.topicPartitions(),
+ taskMetadata.committedOffsets(),
+ taskMetadata.endOffsets(),
+ taskMetadata.timeCurrentIdlingStarted())
+ ).collect(Collectors.toSet()),
+ threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+ taskMetadata.taskId().toString(),
+ taskMetadata.topicPartitions(),
+ taskMetadata.committedOffsets(),
+ taskMetadata.endOffsets(),
+ taskMetadata.timeCurrentIdlingStarted())
+ ).collect(Collectors.toSet())))
+ .collect(Collectors.toSet());
+ }
+
/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
- public Set localThreadsMetadata() {
+ public Set metadataForLocalThreads() {
final Set threadMetadata = new HashSet<>();
processStreamThread(thread -> {
synchronized (thread.getStateLock()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index f270d3e14a3..f10dc9356f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -236,7 +236,7 @@ public class StreamsBuilder {
* K key = "some-key";
* ValueAndTimestamp valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param topic the topic name; cannot be {@code null}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
new file mode 100644
index 00000000000..11c4941b3d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Metadata of a Kafka Streams client.
+ */
+public interface StreamsMetadata {
+
+ /**
+ * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the Streams
+ * client.
+ *
+ * @return {@link HostInfo} corresponding to the Streams client
+ */
+ HostInfo hostInfo();
+
+ /**
+ * Names of the state stores assigned to active tasks of the Streams client.
+ *
+ * @return names of the state stores assigned to active tasks
+ */
+ Set stateStoreNames();
+
+ /**
+ * Source topic partitions of the active tasks of the Streams client.
+ *
+ * @return source topic partitions of the active tasks
+ */
+ Set topicPartitions();
+
+ /**
+ * Changelog topic partitions for the state stores the standby tasks of the Streams client replicates.
+ *
+ * @return set of changelog topic partitions of the standby tasks
+ */
+ Set standbyTopicPartitions();
+
+ /**
+ * Names of the state stores assigned to standby tasks of the Streams client.
+ *
+ * @return names of the state stores assigned to standby tasks
+ */
+ Set standbyStateStoreNames();
+
+ /**
+ * Host where the Streams client runs.
+ *
+ * This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
+ *
+ * @return the host where the Streams client runs
+ */
+ String host();
+
+ /**
+ * Port on which the Streams client listens.
+ *
+ * This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
+ *
+ * @return the port on which Streams client listens
+ */
+ int port();
+
+ /**
+ * Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
+ * also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
+ * {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
+ *
+ * @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
+ */
+ boolean equals(Object o);
+
+ /**
+ * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
+ *
+ * {@code
+ * Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
+ * }
+ *
+ *
+ * @return a hash code value for this object.
+ */
+ int hashCode();
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
new file mode 100644
index 00000000000..0ef742930f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Metadata of a task.
+ */
+public interface TaskMetadata {
+
+ /**
+ * Task ID of the task.
+ *
+ * @return task ID consisting of subtopology and partition ID
+ */
+ TaskId taskId();
+
+ /**
+ * Source topic partitions of the task.
+ *
+ * @return source topic partitions
+ */
+ Set topicPartitions();
+
+ /**
+ * Offsets of the source topic partitions committed so far by the task.
+ *
+ * @return map from source topic partitions to committed offsets
+ */
+ Map committedOffsets();
+
+ /**
+ * End offsets of the source topic partitions of the task.
+ *
+ * @return map source topic partition to end offsets
+ */
+ Map endOffsets();
+
+ /**
+ * Time task idling started. If the task is not currently idling it will return empty.
+ *
+ * @return time when task idling started, empty {@code Optional} if the task is currently not idling
+ */
+ Optional timeCurrentIdlingStarted();
+
+ /**
+ * Compares the specified object with this TaskMetadata. Returns {@code true} if and only if the specified object is
+ * also a TaskMetadata and both {@code taskId()} and {@code topicPartitions()} are equal.
+ *
+ * @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
+ */
+ boolean equals(final Object o);
+
+ /**
+ * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
+ *
+ * {@code
+ * Objects.hash(taskId(), topicPartitions());
+ * }
+ *
+ *
+ * @return a hash code value for this object.
+ */
+ int hashCode();
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
new file mode 100644
index 00000000000..4b84070b72d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Metadata of a stream thread.
+ */
+public interface ThreadMetadata {
+
+
+ /**
+ * State of the stream thread
+ *
+ * @return the state
+ */
+ String threadState();
+
+ /**
+ * Name of the stream thread
+ *
+ * @return the name
+ */
+ String threadName();
+
+ /**
+ * Metadata of the active tasks assigned to the stream thread.
+ *
+ * @return metadata of the active tasks
+ */
+ Set activeTasks();
+
+ /**
+ * Metadata of the standby tasks assigned to the stream thread.
+ *
+ * @return metadata of the standby tasks
+
+ */
+ Set standbyTasks();
+
+ /**
+ * Client ID of the Kafka consumer used by the stream thread.
+ *
+ * @return client ID of the Kafka consumer
+ */
+ String consumerClientId();
+
+ /**
+ * Client ID of the restore Kafka consumer used by the stream thread
+ *
+ * @return client ID of the restore Kafka consumer
+ */
+ String restoreConsumerClientId();
+
+ /**
+ * Client IDs of the Kafka producers used by the stream thread.
+ *
+ * @return client IDs of the Kafka producers
+ */
+ Set producerClientIds();
+
+ /**
+ * Client ID of the admin client used by the stream thread.
+ *
+ * @return client ID of the admin client
+ */
+ String adminClientId();
+
+ /**
+ * Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified object is
+ * also a ThreadMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same
+ * elements, {@code standbyTasks()} contain the same elements, {@code mainConsumerClientId()} are equal, {@code restoreConsumerClientId()}
+ * are equal, {@code producerClientIds()} are equal, {@code producerClientIds} contain the same elements, and {@code adminClientId()} are equal.
+ *
+ * @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
+ */
+ boolean equals(Object o);
+
+ /**
+ * Returns the hash code value for this ThreadMetadata. The hash code of a list is defined to be the result of the following calculation:
+ *
+ * {@code
+ * Objects.hash(
+ * threadName,
+ * threadState,
+ * activeTasks,
+ * standbyTasks,
+ * mainConsumerClientId,
+ * restoreConsumerClientId,
+ * producerClientIds,
+ * adminClientId);
+ *
+ *
+ * @return a hash code value for this object.
+ */
+ int hashCode();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
index 60c68b9ea2c..e85a0375c42 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.KafkaStreams;
/**
* Indicates that the specific state store being queried via
* {@link org.apache.kafka.streams.StoreQueryParameters} used a partitioning that is not assigned to this instance.
- * You can use {@link KafkaStreams#allMetadata()} to discover the correct instance that hosts the requested partition.
+ * You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover the correct instance that hosts the requested partition.
*/
public class InvalidStateStorePartitionException extends InvalidStateStoreException {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index 2fcb5d89c4d..051396fbc94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -90,7 +90,7 @@ public interface CogroupedKStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -140,7 +140,7 @@ public interface CogroupedKStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@@ -191,7 +191,7 @@ public interface CogroupedKStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what
@@ -244,7 +244,7 @@ public interface CogroupedKStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
* the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 49464690cea..072558cf6ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -124,7 +124,7 @@ public interface KGroupedStream {
* K key = "some-word";
* ValueAndTimestamp countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -170,7 +170,7 @@ public interface KGroupedStream {
* K key = "some-word";
* ValueAndTimestamp countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -274,7 +274,7 @@ public interface KGroupedStream {
* K key = "some-key";
* ValueAndTimestamp reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -338,7 +338,7 @@ public interface KGroupedStream {
* K key = "some-key";
* ValueAndTimestamp reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -443,7 +443,7 @@ public interface KGroupedStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -502,7 +502,7 @@ public interface KGroupedStream {
* K key = "some-key";
* ValueAndTimestamp aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 245ad4abce6..06d12e1d4ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -63,7 +63,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -106,7 +106,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
*
@@ -234,7 +234,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -307,7 +307,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -445,7 +445,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -529,7 +529,7 @@ public interface KGroupedTable {
* K key = "some-word";
* ValueAndTimestamp aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 04e55858bbc..ad866145a3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -138,7 +138,7 @@ public interface KTable {
* K key = "some-word";
* ValueAndTimestamp valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -177,7 +177,7 @@ public interface KTable {
* K key = "some-word";
* ValueAndTimestamp valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -263,7 +263,7 @@ public interface KTable {
* K key = "some-word";
* ValueAndTimestamp valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -301,7 +301,7 @@ public interface KTable {
* K key = "some-word";
* ValueAndTimestamp valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -466,7 +466,7 @@ public interface KTable {
*
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -512,7 +512,7 @@ public interface KTable {
*
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -560,7 +560,7 @@ public interface KTable {
*
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
@@ -607,7 +607,7 @@ public interface KTable {
*
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index 12be765c3b4..b7e3b07e371 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -181,7 +181,7 @@ public interface SessionWindowedCogroupedKStream {
* long toTime = ...;
* WindowStoreIterator aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -238,7 +238,7 @@ public interface SessionWindowedCogroupedKStream {
* String key = "some-key";
* KeyValueIterator, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 79e1511269b..1b7a363d3c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -138,7 +138,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -184,7 +184,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -342,7 +342,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -403,7 +403,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -561,7 +561,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@@ -621,7 +621,7 @@ public interface SessionWindowedKStream {
* String key = "some-key";
* KeyValueIterator, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index 3d369bc8849..e4178bc9e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -171,7 +171,7 @@ public interface TimeWindowedCogroupedKStream {
* long toTime = ...;
* WindowStoreIterator> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -228,7 +228,7 @@ public interface TimeWindowedCogroupedKStream {
* long toTime = ...;
* WindowStoreIterator> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 38383a716c8..c015e79032a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -141,7 +141,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -190,7 +190,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -341,7 +341,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -402,7 +402,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -562,7 +562,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
@@ -625,7 +625,7 @@ public interface TimeWindowedKStream {
* long toTime = ...;
* WindowStoreIterator> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
* }
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
index f5a5a695bfd..b9f5d915abc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
@@ -27,10 +27,12 @@ import java.util.Set;
/**
* Represents the state of a single task running within a {@link KafkaStreams} application.
+ * @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead.
*/
+@Deprecated
public class TaskMetadata {
- private final TaskId taskId;
+ private final String taskId;
private final Set topicPartitions;
@@ -40,24 +42,11 @@ public class TaskMetadata {
private final Optional timeCurrentIdlingStarted;
- /**
- * @deprecated since 3.0, not intended for public use
- */
- @Deprecated
public TaskMetadata(final String taskId,
final Set topicPartitions,
final Map committedOffsets,
final Map endOffsets,
final Optional timeCurrentIdlingStarted) {
- this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted);
- }
-
- // For internal use -- not a public API
- public TaskMetadata(final TaskId taskId,
- final Set topicPartitions,
- final Map committedOffsets,
- final Map endOffsets,
- final Optional timeCurrentIdlingStarted) {
this.taskId = taskId;
this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
this.committedOffsets = Collections.unmodifiableMap(committedOffsets);
@@ -68,17 +57,8 @@ public class TaskMetadata {
/**
* @return the basic task metadata such as subtopology and partition id
*/
- public TaskId getTaskId() {
- return taskId;
- }
-
- /**
- * @return the basic task metadata such as subtopology and partition id
- * @deprecated please use {@link #getTaskId()} instead.
- */
- @Deprecated
public String taskId() {
- return taskId.toString();
+ return taskId;
}
public Set topicPartitions() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
index b1de881cc67..68ac6633b76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
@@ -24,7 +24,9 @@ import java.util.Set;
/**
* Represents the state of a single thread running within a {@link KafkaStreams} application.
+ * @deprecated since 3.0 use {@link org.apache.kafka.streams.ThreadMetadata} instead
*/
+@Deprecated
public class ThreadMetadata {
private final String threadName;
@@ -51,8 +53,8 @@ public class ThreadMetadata {
final String restoreConsumerClientId,
final Set producerClientIds,
final String adminClientId,
- final Set activeTasks,
- final Set standbyTasks) {
+ final Set activeTasks,
+ final Set standbyTasks) {
this.mainConsumerClientId = mainConsumerClientId;
this.restoreConsumerClientId = restoreConsumerClientId;
this.producerClientIds = producerClientIds;
@@ -71,11 +73,11 @@ public class ThreadMetadata {
return threadName;
}
- public Set activeTasks() {
+ public Set activeTasks() {
return activeTasks;
}
- public Set standbyTasks() {
+ public Set standbyTasks() {
return standbyTasks;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 62a3d408ce9..6c9cbeb115e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,14 +34,14 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TaskMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -1118,7 +1118,7 @@ public class StreamThread extends Thread {
// package-private for testing only
StreamThread updateThreadMetadata(final String adminClientId) {
- threadMetadata = new ThreadMetadata(
+ threadMetadata = new ThreadMetadataImpl(
getName(),
state().name(),
getConsumerClientId(getName()),
@@ -1135,7 +1135,7 @@ public class StreamThread extends Thread {
final Map standbyTasks) {
final Set activeTasksMetadata = new HashSet<>();
for (final Map.Entry task : activeTasks.entrySet()) {
- activeTasksMetadata.add(new TaskMetadata(
+ activeTasksMetadata.add(new TaskMetadataImpl(
task.getValue().id(),
task.getValue().inputPartitions(),
task.getValue().committedOffsets(),
@@ -1145,7 +1145,7 @@ public class StreamThread extends Thread {
}
final Set standbyTasksMetadata = new HashSet<>();
for (final Map.Entry task : standbyTasks.entrySet()) {
- standbyTasksMetadata.add(new TaskMetadata(
+ standbyTasksMetadata.add(new TaskMetadataImpl(
task.getValue().id(),
task.getValue().inputPartitions(),
task.getValue().committedOffsets(),
@@ -1155,7 +1155,7 @@ public class StreamThread extends Thread {
}
final String adminClientId = threadMetadata.adminClientId();
- threadMetadata = new ThreadMetadata(
+ threadMetadata = new ThreadMetadataImpl(
getName(),
state().name(),
getConsumerClientId(getName()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 802be92c773..d8fdbde5aa5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -27,7 +27,8 @@ import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import java.util.ArrayList;
import java.util.Collection;
@@ -92,7 +93,7 @@ public class StreamsMetadataState {
public Collection getAllMetadata() {
return Collections.unmodifiableList(allMetadata);
}
-
+
/**
* Find all of the {@link StreamsMetadata}s for a given storeName
*
@@ -229,11 +230,12 @@ public class StreamsMetadataState {
final Map> standbyPartitionHostMap) {
if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
allMetadata = Collections.emptyList();
- localMetadata.set(new StreamsMetadata(thisHost,
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet()
+ localMetadata.set(new StreamsMetadataImpl(
+ thisHost,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet()
));
return;
}
@@ -258,11 +260,12 @@ public class StreamsMetadataState {
standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
}
- final StreamsMetadata metadata = new StreamsMetadata(hostInfo,
- activeStoresOnHost,
- activePartitionsOnHost,
- standbyStoresOnHost,
- standbyPartitionsOnHost);
+ final StreamsMetadata metadata = new StreamsMetadataImpl(
+ hostInfo,
+ activeStoresOnHost,
+ activePartitionsOnHost,
+ standbyStoresOnHost,
+ standbyPartitionsOnHost);
rebuiltMetadata.add(metadata);
if (hostInfo.equals(thisHost)) {
localMetadata.set(metadata);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
new file mode 100644
index 00000000000..95b14d0b0d6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+ private final TaskId taskId;
+
+ private final Set topicPartitions;
+
+ private final Map committedOffsets;
+
+ private final Map endOffsets;
+
+ private final Optional timeCurrentIdlingStarted;
+
+ public TaskMetadataImpl(final TaskId taskId,
+ final Set topicPartitions,
+ final Map committedOffsets,
+ final Map endOffsets,
+ final Optional timeCurrentIdlingStarted) {
+ this.taskId = taskId;
+ this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+ this.committedOffsets = Collections.unmodifiableMap(committedOffsets);
+ this.endOffsets = Collections.unmodifiableMap(endOffsets);
+ this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
+ }
+
+ @Override
+ public TaskId taskId() {
+ return taskId;
+ }
+
+ @Override
+ public Set topicPartitions() {
+ return topicPartitions;
+ }
+
+ @Override
+ public Map committedOffsets() {
+ return committedOffsets;
+ }
+
+ @Override
+ public Map endOffsets() {
+ return endOffsets;
+ }
+
+ @Override
+ public Optional timeCurrentIdlingStarted() {
+ return timeCurrentIdlingStarted;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskMetadataImpl that = (TaskMetadataImpl) o;
+ return Objects.equals(taskId, that.taskId) &&
+ Objects.equals(topicPartitions, that.topicPartitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(taskId, topicPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskMetadata{" +
+ "taskId=" + taskId +
+ ", topicPartitions=" + topicPartitions +
+ ", committedOffsets=" + committedOffsets +
+ ", endOffsets=" + endOffsets +
+ ", timeCurrentIdlingStarted=" + timeCurrentIdlingStarted +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
new file mode 100644
index 00000000000..7a0188c382a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+ private final String threadName;
+
+ private final String threadState;
+
+ private final Set activeTasks;
+
+ private final Set standbyTasks;
+
+ private final String mainConsumerClientId;
+
+ private final String restoreConsumerClientId;
+
+ private final Set producerClientIds;
+
+ // the admin client should be shared among all threads, so the client id should be the same;
+ // we keep it at the thread-level for user's convenience and possible extensions in the future
+ private final String adminClientId;
+
+ public ThreadMetadataImpl(final String threadName,
+ final String threadState,
+ final String mainConsumerClientId,
+ final String restoreConsumerClientId,
+ final Set producerClientIds,
+ final String adminClientId,
+ final Set activeTasks,
+ final Set standbyTasks) {
+ this.mainConsumerClientId = mainConsumerClientId;
+ this.restoreConsumerClientId = restoreConsumerClientId;
+ this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+ this.adminClientId = adminClientId;
+ this.threadName = threadName;
+ this.threadState = threadState;
+ this.activeTasks = Collections.unmodifiableSet(activeTasks);
+ this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
+ }
+
+
+ public String threadState() {
+ return threadState;
+ }
+
+ public String threadName() {
+ return threadName;
+ }
+
+
+ public Set activeTasks() {
+ return activeTasks;
+ }
+
+ public Set standbyTasks() {
+ return standbyTasks;
+ }
+
+ public String consumerClientId() {
+ return mainConsumerClientId;
+ }
+
+ public String restoreConsumerClientId() {
+ return restoreConsumerClientId;
+ }
+
+ public Set producerClientIds() {
+ return producerClientIds;
+ }
+
+ public String adminClientId() {
+ return adminClientId;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ThreadMetadataImpl that = (ThreadMetadataImpl) o;
+ return Objects.equals(threadName, that.threadName) &&
+ Objects.equals(threadState, that.threadState) &&
+ Objects.equals(activeTasks, that.activeTasks) &&
+ Objects.equals(standbyTasks, that.standbyTasks) &&
+ mainConsumerClientId.equals(that.mainConsumerClientId) &&
+ restoreConsumerClientId.equals(that.restoreConsumerClientId) &&
+ Objects.equals(producerClientIds, that.producerClientIds) &&
+ adminClientId.equals(that.adminClientId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ threadName,
+ threadState,
+ activeTasks,
+ standbyTasks,
+ mainConsumerClientId,
+ restoreConsumerClientId,
+ producerClientIds,
+ adminClientId);
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadMetadata{" +
+ "threadName=" + threadName +
+ ", threadState=" + threadState +
+ ", activeTasks=" + activeTasks +
+ ", standbyTasks=" + standbyTasks +
+ ", consumerClientId=" + mainConsumerClientId +
+ ", restoreConsumerClientId=" + restoreConsumerClientId +
+ ", producerClientIds=" + producerClientIds +
+ ", adminClientId=" + adminClientId +
+ '}';
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 1af3649295d..c25f1840e52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
/**
* Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
* Instances of this class can be obtained by calling one of:
- * {@link KafkaStreams#allMetadata()}
- * {@link KafkaStreams#allMetadataForStore(String)}
+ * {@link KafkaStreams#metadataForAllStreamsClients()}
+ * {@link KafkaStreams#streamsMetadataForStore(String)}
*
* The HostInfo is constructed during Partition Assignment
* see {@link StreamsPartitionAssignor}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 1715cf4d3e0..131d16fb917 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -29,7 +29,9 @@ import java.util.Set;
* APIs and services to connect to other instances, the Set of state stores available on
* the instance and the Set of {@link TopicPartition}s available on the instance.
* NOTE: This is a point in time view. It may change when rebalances happen.
+ * @deprecated since 3.0.0 use {@link org.apache.kafka.streams.StreamsMetadata}
*/
+@Deprecated
public class StreamsMetadata {
/**
* Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
new file mode 100644
index 00000000000..6bd314ce6d0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+ /**
+ * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+ * operations.
+ */
+ public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(
+ HostInfo.unavailable(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet());
+
+ private final HostInfo hostInfo;
+
+ private final Set stateStoreNames;
+
+ private final Set topicPartitions;
+
+ private final Set standbyStateStoreNames;
+
+ private final Set standbyTopicPartitions;
+
+ public StreamsMetadataImpl(final HostInfo hostInfo,
+ final Set stateStoreNames,
+ final Set topicPartitions,
+ final Set standbyStateStoreNames,
+ final Set standbyTopicPartitions) {
+
+ this.hostInfo = hostInfo;
+ this.stateStoreNames = Collections.unmodifiableSet(stateStoreNames);
+ this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+ this.standbyTopicPartitions = Collections.unmodifiableSet(standbyTopicPartitions);
+ this.standbyStateStoreNames = Collections.unmodifiableSet(standbyStateStoreNames);
+ }
+
+ /**
+ * The value of {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+ * instance, which is typically host/port
+ *
+ * @return {@link HostInfo} corresponding to the streams instance
+ */
+ @Override
+ public HostInfo hostInfo() {
+ return hostInfo;
+ }
+
+ /**
+ * State stores owned by the instance as an active replica
+ *
+ * @return set of active state store names
+ */
+ @Override
+ public Set stateStoreNames() {
+ return stateStoreNames;
+ }
+
+ /**
+ * Topic partitions consumed by the instance as an active replica
+ *
+ * @return set of active topic partitions
+ */
+ @Override
+ public Set topicPartitions() {
+ return topicPartitions;
+ }
+
+ /**
+ * (Source) Topic partitions for which the instance acts as standby.
+ *
+ * @return set of standby topic partitions
+ */
+ @Override
+ public Set standbyTopicPartitions() {
+ return standbyTopicPartitions;
+ }
+
+ /**
+ * State stores owned by the instance as a standby replica
+ *
+ * @return set of standby state store names
+ */
+ @Override
+ public Set standbyStateStoreNames() {
+ return standbyStateStoreNames;
+ }
+
+ @Override
+ public String host() {
+ return hostInfo.host();
+ }
+
+ @SuppressWarnings("unused")
+ @Override
+ public int port() {
+ return hostInfo.port();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final StreamsMetadataImpl that = (StreamsMetadataImpl) o;
+ return Objects.equals(hostInfo, that.hostInfo)
+ && Objects.equals(stateStoreNames, that.stateStoreNames)
+ && Objects.equals(topicPartitions, that.topicPartitions)
+ && Objects.equals(standbyStateStoreNames, that.standbyStateStoreNames)
+ && Objects.equals(standbyTopicPartitions, that.standbyTopicPartitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostInfo, stateStoreNames, topicPartitions, standbyStateStoreNames, standbyTopicPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return "StreamsMetadata {" +
+ "hostInfo=" + hostInfo +
+ ", stateStoreNames=" + stateStoreNames +
+ ", topicPartitions=" + topicPartitions +
+ ", standbyStateStoreNames=" + standbyStateStoreNames +
+ ", standbyTopicPartitions=" + standbyTopicPartitions +
+ '}';
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index af6ad4aa3f8..a2d542ec052 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
@@ -52,6 +51,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -140,8 +140,6 @@ public class KafkaStreamsTest {
private GlobalStreamThread globalStreamThread;
@Mock
private Metrics metrics;
- @Mock
- private ThreadMetadata threadMetadata;
private StateListenerStub streamsStateListener;
private Capture> metricsReportersCapture;
@@ -329,7 +327,7 @@ public class KafkaStreamsTest {
return null;
}).anyTimes();
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
- EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
+ EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadataImpl(
"processId-StreamThread-" + threadId,
"DEAD",
"",
@@ -500,7 +498,7 @@ public class KafkaStreamsTest {
streams.threads.get(i).join();
}
waitForCondition(
- () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
+ () -> streams.metadataForLocalThreads().stream().allMatch(t -> t.threadState().equals("DEAD")),
"Streams never stopped"
);
streams.close();
@@ -619,7 +617,7 @@ public class KafkaStreamsTest {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
streamThreadOne.shutdown();
- final Set threads = streams.localThreadsMetadata();
+ final Set threads = streams.metadataForLocalThreads();
assertThat(threads.size(), equalTo(1));
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
}
@@ -756,24 +754,24 @@ public class KafkaStreamsTest {
@Test
public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
- assertThrows(StreamsNotStartedException.class, streams::allMetadata);
+ assertThrows(StreamsNotStartedException.class, streams::metadataForAllStreamsClients);
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
- assertThrows(IllegalStateException.class, streams::allMetadata);
+ assertThrows(IllegalStateException.class, streams::metadataForAllStreamsClients);
}
}
@Test
public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
- assertThrows(StreamsNotStartedException.class, () -> streams.allMetadataForStore("store"));
+ assertThrows(StreamsNotStartedException.class, () -> streams.streamsMetadataForStore("store"));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
- assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
+ assertThrows(IllegalStateException.class, () -> streams.streamsMetadataForStore("store"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index b6393112d79..26edd69a1c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -30,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@@ -164,22 +164,22 @@ public class AdjustStreamThreadCountTest {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
- assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+ final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
+ assertThat(kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
stateTransitionHistory.clear();
final Optional name = kafkaStreams.addStreamThread();
assertThat(name, not(Optional.empty()));
TestUtils.waitForCondition(
- () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+ () -> kafkaStreams.metadataForLocalThreads().stream().sequential()
.map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
"Wait for the thread to be added"
);
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount + 1));
assertThat(
kafkaStreams
- .localThreadsMetadata()
+ .metadataForLocalThreads()
.stream()
.map(t -> t.threadName().split("-StreamThread-")[1])
.sorted().toArray(),
@@ -196,10 +196,10 @@ public class AdjustStreamThreadCountTest {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
stateTransitionHistory.clear();
assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
waitForTransitionFromRebalancingToRunning();
}
@@ -212,10 +212,10 @@ public class AdjustStreamThreadCountTest {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
stateTransitionHistory.clear();
assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
waitForTransitionFromRebalancingToRunning();
}
@@ -236,7 +236,7 @@ public class AdjustStreamThreadCountTest {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
stateTransitionHistory.clear();
final CountDownLatch latch = new CountDownLatch(2);
@@ -245,7 +245,7 @@ public class AdjustStreamThreadCountTest {
two.start();
one.start();
latch.await(30, TimeUnit.SECONDS);
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount));
waitForTransitionFromRebalancingToRunning();
}
@@ -267,11 +267,11 @@ public class AdjustStreamThreadCountTest {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
stateTransitionHistory.clear();
assertThat(
- kafkaStreams.localThreadsMetadata()
+ kafkaStreams.metadataForLocalThreads()
.stream()
.map(t -> t.threadName().split("-StreamThread-")[1])
.sorted()
@@ -284,16 +284,16 @@ public class AdjustStreamThreadCountTest {
assertThat("New thread has index 3", "3".equals(name.get().split("-StreamThread-")[1]));
TestUtils.waitForCondition(
() -> kafkaStreams
- .localThreadsMetadata()
+ .metadataForLocalThreads()
.stream().sequential()
.map(ThreadMetadata::threadName)
.anyMatch(t -> t.equals(name.get())),
"Stream thread has not been added"
);
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount + 1));
assertThat(
kafkaStreams
- .localThreadsMetadata()
+ .metadataForLocalThreads()
.stream()
.map(t -> t.threadName().split("-StreamThread-")[1])
.sorted()
@@ -302,13 +302,13 @@ public class AdjustStreamThreadCountTest {
);
waitForTransitionFromRebalancingToRunning();
- oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
stateTransitionHistory.clear();
final Optional removedThread = kafkaStreams.removeStreamThread();
assertThat(removedThread, not(Optional.empty()));
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
waitForTransitionFromRebalancingToRunning();
stateTransitionHistory.clear();
@@ -317,14 +317,14 @@ public class AdjustStreamThreadCountTest {
assertThat(name2, not(Optional.empty()));
TestUtils.waitForCondition(
- () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+ () -> kafkaStreams.metadataForLocalThreads().stream().sequential()
.map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))),
"Wait for the thread to be added"
);
- assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+ assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount));
assertThat(
kafkaStreams
- .localThreadsMetadata()
+ .metadataForLocalThreads()
.stream()
.map(t -> t.threadName().split("-StreamThread-")[1])
.sorted()
@@ -342,7 +342,7 @@ public class AdjustStreamThreadCountTest {
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
- final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+ final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
final int threadCount = 5;
final int loop = 3;
final AtomicReference lastException = new AtomicReference<>();
@@ -353,7 +353,7 @@ public class AdjustStreamThreadCountTest {
for (int i = 0; i < loop + 1; i++) {
if (!kafkaStreams.addStreamThread().isPresent())
throw new RuntimeException("failed to create stream thread");
- kafkaStreams.localThreadsMetadata();
+ kafkaStreams.metadataForLocalThreads();
if (!kafkaStreams.removeStreamThread().isPresent())
throw new RuntimeException("failed to delete a stream thread");
}
@@ -365,7 +365,7 @@ public class AdjustStreamThreadCountTest {
executor.shutdown();
assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
assertNull(lastException.get());
- assertEquals(oldThreadCount, kafkaStreams.localThreadsMetadata().size());
+ assertEquals(oldThreadCount, kafkaStreams.metadataForLocalThreads().size());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index e482798fd5c..343a1052a18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -696,13 +696,13 @@ public class EosIntegrationTest {
// the assignment is. We only really care that the remaining instance only sees one host
// that owns both partitions.
waitForCondition(
- () -> stallingInstance.allMetadata().size() == 2
- && remainingInstance.allMetadata().size() == 1
- && remainingInstance.allMetadata().iterator().next().topicPartitions().size() == 2,
+ () -> stallingInstance.metadataForAllStreamsClients().size() == 2
+ && remainingInstance.metadataForAllStreamsClients().size() == 1
+ && remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
MAX_WAIT_TIME_MS,
() -> "Should have rebalanced.\n" +
- "Streams1[" + streams1.allMetadata() + "]\n" +
- "Streams2[" + streams2.allMetadata() + "]");
+ "Streams1[" + streams1.metadataForAllStreamsClients() + "]\n" +
+ "Streams2[" + streams2.metadataForAllStreamsClients() + "]");
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
//
@@ -729,14 +729,14 @@ public class EosIntegrationTest {
// It doesn't really matter what the assignment is, but we might as well also assert that they
// both see both partitions assigned exactly once
waitForCondition(
- () -> streams1.allMetadata().size() == 2
- && streams2.allMetadata().size() == 2
- && streams1.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2
- && streams2.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2,
+ () -> streams1.metadataForAllStreamsClients().size() == 2
+ && streams2.metadataForAllStreamsClients().size() == 2
+ && streams1.metadataForAllStreamsClients().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2
+ && streams2.metadataForAllStreamsClients().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2,
MAX_WAIT_TIME_MS,
() -> "Should have rebalanced.\n" +
- "Streams1[" + streams1.allMetadata() + "]\n" +
- "Streams2[" + streams2.allMetadata() + "]");
+ "Streams1[" + streams1.metadataForAllStreamsClients() + "]\n" +
+ "Streams2[" + streams2.metadataForAllStreamsClients() + "]");
writeInputData(dataAfterSecondRebalance);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index 8e05757053c..af952bda9bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -27,11 +27,11 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -214,13 +214,13 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private void setStateListenersForVerification(final Predicate taskCondition) {
client1.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING &&
- client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client1.metadataForLocalThreads().stream().allMatch(taskCondition)) {
client1IsOk = true;
}
});
client2.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING &&
- client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client2.metadataForLocalThreads().stream().allMatch(taskCondition)) {
client2IsOk = true;
}
});
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index f924e08b85e..28eeeef26f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -22,13 +22,13 @@ import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -167,14 +167,14 @@ public class StandbyTaskCreationIntegrationTest {
private void setStateListenersForVerification(final Predicate taskCondition) {
client1.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING &&
- client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client1.metadataForLocalThreads().stream().allMatch(taskCondition)) {
client1IsOk = true;
}
});
client2.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING &&
- client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+ client2.metadataForLocalThreads().stream().allMatch(taskCondition)) {
client2IsOk = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 4f2ba21d0d3..d3e4f8ca241 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -370,8 +370,8 @@ public class StoreQueryIntegrationTest {
startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
- assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
- assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+ assertTrue(kafkaStreams1.metadataForLocalThreads().size() > 1);
+ assertTrue(kafkaStreams2.metadataForLocalThreads().size() > 1);
produceValueRange(key, 0, batch1NumMessages);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index b5edece70c9..0a4536ece76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -158,7 +158,7 @@ public class TaskMetadataIntegrationTest {
}
private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
- final List taskMetadataList = kafkaStreams.localThreadsMetadata().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
+ final List taskMetadataList = kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
assertThat("only one task", taskMetadataList.size() == 1);
return taskMetadataList.get(0);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b8e5d23eef9..deeb26d85af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -63,8 +64,6 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TaskMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -1115,6 +1114,7 @@ public class StreamThreadTest {
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
taskManager.shutdown(true);
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
@@ -1153,7 +1153,7 @@ public class StreamThreadTest {
@Test
public void shouldNotReturnDataAfterTaskMigrated() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final InternalTopologyBuilder internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);
expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);
@@ -1221,6 +1221,7 @@ public class StreamThreadTest {
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
taskManager.shutdown(true);
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
@@ -1258,6 +1259,7 @@ public class StreamThreadTest {
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
taskManager.shutdown(true);
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
@@ -1641,7 +1643,7 @@ public class StreamThreadTest {
final ThreadMetadata metadata = thread.threadMetadata();
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
- assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
+ assertTrue(metadata.activeTasks().contains(new TaskMetadataImpl(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
assertTrue(metadata.standbyTasks().isEmpty());
assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
@@ -1694,7 +1696,7 @@ public class StreamThreadTest {
final ThreadMetadata threadMetadata = thread.threadMetadata();
assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
- assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
+ assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadataImpl(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
assertTrue(threadMetadata.activeTasks().isEmpty());
thread.taskManager().shutdown(true);
@@ -1974,12 +1976,6 @@ public class StreamThreadTest {
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
}
- private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata, final StreamThread.State state) {
- assertEquals(state.name(), metadata.threadState());
- assertTrue(metadata.activeTasks().isEmpty());
- assertTrue(metadata.standbyTasks().isEmpty());
- }
-
@Test
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
@@ -2163,6 +2159,7 @@ public class StreamThreadTest {
final Set assignedPartitions = Collections.singleton(t1p1);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
consumer.assign(assignedPartitions);
consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
@@ -2210,6 +2207,7 @@ public class StreamThreadTest {
final Set assignedPartitions = Collections.singleton(t1p1);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
consumer.assign(assignedPartitions);
consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
@@ -2256,7 +2254,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final Consumer consumer = mock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
consumer.subscribe((Collection) anyObject(), anyObject());
@@ -2319,6 +2317,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final Consumer consumer = mock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
@@ -2386,6 +2385,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final Consumer consumer = mock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
@@ -2718,6 +2718,7 @@ public class StreamThreadTest {
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumer, consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+ expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final StreamThread thread = new StreamThread(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 640922c2e70..df52df37d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -24,13 +24,14 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.junit.Before;
import org.junit.Test;
@@ -139,17 +140,17 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetAllStreamInstances() {
- final StreamsMetadata one = new StreamsMetadata(hostOne,
+ final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
mkSet(globalTable, "table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0),
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic2P0, topic1P1));
- final StreamsMetadata two = new StreamsMetadata(hostTwo,
+ final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
mkSet(globalTable, "table-two", "table-one", "merged-table"),
mkSet(topic2P0, topic1P1),
mkSet("table-three"),
mkSet(topic3P0));
- final StreamsMetadata three = new StreamsMetadata(hostThree,
+ final StreamsMetadata three = new StreamsMetadataImpl(hostThree,
mkSet(globalTable, "table-three"),
Collections.singleton(topic3P0),
mkSet("table-one", "table-two", "merged-table"),
@@ -173,7 +174,7 @@ public class StreamsMetadataStateTest {
metadataState.onChange(hostToActivePartitions, Collections.emptyMap(),
cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
- final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable),
+ final StreamsMetadata expected = new StreamsMetadataImpl(hostFour, Collections.singleton(globalTable),
Collections.singleton(tp5), Collections.emptySet(), Collections.emptySet());
final Collection actual = metadataState.getAllMetadata();
assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
@@ -181,12 +182,12 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetInstancesForStoreName() {
- final StreamsMetadata one = new StreamsMetadata(hostOne,
+ final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
mkSet(globalTable, "table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0),
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic2P0, topic1P1));
- final StreamsMetadata two = new StreamsMetadata(hostTwo,
+ final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
mkSet(globalTable, "table-two", "table-one", "merged-table"),
mkSet(topic2P0, topic1P1),
mkSet("table-three"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
new file mode 100644
index 00000000000..dfe5daffb2b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+public class TaskMetadataImplTest {
+
+ public static final TaskId TASK_ID = new TaskId(1, 2);
+ public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+ public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+ public static final Set TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+ public static final Map COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+ public static final Map END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+ public static final Optional TIME_CURRENT_IDLING_STARTED = Optional.of(3L);
+
+ private TaskMetadata taskMetadata;
+
+ @Before
+ public void setUp() {
+ taskMetadata = new TaskMetadataImpl(
+ TASK_ID,
+ TOPIC_PARTITIONS,
+ COMMITTED_OFFSETS,
+ END_OFFSETS,
+ TIME_CURRENT_IDLING_STARTED);
+ }
+
+ @Test
+ public void shouldNotAllowModificationOfInternalStateViaGetters() {
+ assertThat(isUnmodifiable(taskMetadata.topicPartitions()), is(true));
+ assertThat(isUnmodifiable(taskMetadata.committedOffsets()), is(true));
+ assertThat(isUnmodifiable(taskMetadata.endOffsets()), is(true));
+ }
+
+ @Test
+ public void shouldBeEqualsIfSameObject() {
+ final TaskMetadataImpl same = new TaskMetadataImpl(
+ TASK_ID,
+ TOPIC_PARTITIONS,
+ COMMITTED_OFFSETS,
+ END_OFFSETS,
+ TIME_CURRENT_IDLING_STARTED);
+ assertThat(taskMetadata, equalTo(same));
+ assertThat(taskMetadata.hashCode(), equalTo(same.hashCode()));
+ }
+
+ @Test
+ public void shouldBeEqualsIfOnlyDifferInCommittedOffsets() {
+ final TaskMetadataImpl stillSameDifferCommittedOffsets = new TaskMetadataImpl(
+ TASK_ID,
+ TOPIC_PARTITIONS,
+ mkMap(mkEntry(TP_1, 1000000L), mkEntry(TP_1, 2L)),
+ END_OFFSETS,
+ TIME_CURRENT_IDLING_STARTED);
+ assertThat(taskMetadata, equalTo(stillSameDifferCommittedOffsets));
+ assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferCommittedOffsets.hashCode()));
+ }
+
+ @Test
+ public void shouldBeEqualsIfOnlyDifferInEndOffsets() {
+ final TaskMetadataImpl stillSameDifferEndOffsets = new TaskMetadataImpl(
+ TASK_ID,
+ TOPIC_PARTITIONS,
+ COMMITTED_OFFSETS,
+ mkMap(mkEntry(TP_1, 1000000L), mkEntry(TP_1, 2L)),
+ TIME_CURRENT_IDLING_STARTED);
+ assertThat(taskMetadata, equalTo(stillSameDifferEndOffsets));
+ assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferEndOffsets.hashCode()));
+ }
+
+ @Test
+ public void shouldBeEqualsIfOnlyDifferInIdlingTime() {
+ final TaskMetadataImpl stillSameDifferIdlingTime = new TaskMetadataImpl(
+ TASK_ID,
+ TOPIC_PARTITIONS,
+ COMMITTED_OFFSETS,
+ END_OFFSETS,
+ Optional.empty());
+ assertThat(taskMetadata, equalTo(stillSameDifferIdlingTime));
+ assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferIdlingTime.hashCode()));
+ }
+
+ @Test
+ public void shouldNotBeEqualsIfDifferInTaskID() {
+ final TaskMetadataImpl differTaskId = new TaskMetadataImpl(
+ new TaskId(1, 10000),
+ TOPIC_PARTITIONS,
+ COMMITTED_OFFSETS,
+ END_OFFSETS,
+ TIME_CURRENT_IDLING_STARTED);
+ assertThat(taskMetadata, not(equalTo(differTaskId)));
+ assertThat(taskMetadata.hashCode(), not(equalTo(differTaskId.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualsIfDifferInTopicPartitions() {
+ final TaskMetadataImpl differTopicPartitions = new TaskMetadataImpl(
+ TASK_ID,
+ mkSet(TP_0),
+ COMMITTED_OFFSETS,
+ END_OFFSETS,
+ TIME_CURRENT_IDLING_STARTED);
+ assertThat(taskMetadata, not(equalTo(differTopicPartitions)));
+ assertThat(taskMetadata.hashCode(), not(equalTo(differTopicPartitions.hashCode())));
+ }
+
+ private static boolean isUnmodifiable(final Collection> collection) {
+ try {
+ collection.clear();
+ return false;
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ }
+ }
+
+ private static boolean isUnmodifiable(final Map, ?> collection) {
+ try {
+ collection.clear();
+ return false;
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
new file mode 100644
index 00000000000..b87f662c90e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+public class ThreadMetadataImplTest {
+
+ public static final String THREAD_NAME = "thread name";
+ public static final String THREAD_STATE = "thread state";
+ public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer ClientID";
+ public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer ClientID";
+ public static final String CLIENT_ID_1 = "client Id 1";
+ public static final String CLIENT_ID_2 = "client Id 2";
+ public static final Set PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, CLIENT_ID_2);
+ public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+ public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+ public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+ public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+ public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+ public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+ public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+ TASK_ID_0,
+ mkSet(TP_0_0, TP_1_0),
+ mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+ mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+ Optional.of(3L));
+ public static final TaskMetadata TM_1 = new TaskMetadataImpl(
+ TASK_ID_1,
+ mkSet(TP_0_1, TP_1_1),
+ mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+ mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+ Optional.of(3L));
+ public static final Set STANDBY_TASKS = mkSet(TM_0, TM_1);
+ public static final Set ACTIVE_TASKS = mkSet(TM_1);
+ public static final String ADMIN_CLIENT_ID = "admin ClientID";
+
+ private ThreadMetadata threadMetadata;
+
+ @Before
+ public void setUp() {
+ threadMetadata = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ }
+
+ @Test
+ public void shouldNotAllowModificationOfInternalStateViaGetters() {
+ assertThat(isUnmodifiable(threadMetadata.producerClientIds()), is(true));
+ assertThat(isUnmodifiable(threadMetadata.activeTasks()), is(true));
+ assertThat(isUnmodifiable(threadMetadata.standbyTasks()), is(true));
+ }
+
+ @Test
+ public void shouldBeEqualIfSameObject() {
+ final ThreadMetadata same = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, equalTo(same));
+ assertThat(threadMetadata.hashCode(), equalTo(same.hashCode()));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInThreadName() {
+ final ThreadMetadata differThreadName = new ThreadMetadataImpl(
+ "different",
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differThreadName)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differThreadName.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInThreadState() {
+ final ThreadMetadata differThreadState = new ThreadMetadataImpl(
+ THREAD_NAME,
+ "different",
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differThreadState)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differThreadState.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInClientId() {
+ final ThreadMetadata differMainConsumerClientId = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ "different",
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differMainConsumerClientId)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differMainConsumerClientId.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInConsumerClientId() {
+ final ThreadMetadata differRestoreConsumerClientId = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ "different",
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differRestoreConsumerClientId)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differRestoreConsumerClientId.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInProducerClientIds() {
+ final ThreadMetadata differProducerClientIds = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ mkSet(CLIENT_ID_1),
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differProducerClientIds)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differProducerClientIds.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInAdminClientId() {
+ final ThreadMetadata differAdminClientId = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ "different",
+ ACTIVE_TASKS,
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differAdminClientId)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differAdminClientId.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInActiveTasks() {
+ final ThreadMetadata differActiveTasks = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ mkSet(TM_0),
+ STANDBY_TASKS
+ );
+ assertThat(threadMetadata, not(equalTo(differActiveTasks)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differActiveTasks.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInStandByTasks() {
+ final ThreadMetadata differStandByTasks = new ThreadMetadataImpl(
+ THREAD_NAME,
+ THREAD_STATE,
+ MAIN_CONSUMER_CLIENT_ID,
+ RESTORE_CONSUMER_CLIENT_ID,
+ PRODUCER_CLIENT_IDS,
+ ADMIN_CLIENT_ID,
+ ACTIVE_TASKS,
+ mkSet(TM_0)
+ );
+ assertThat(threadMetadata, not(equalTo(differStandByTasks)));
+ assertThat(threadMetadata.hashCode(), not(equalTo(differStandByTasks.hashCode())));
+ }
+
+ private static boolean isUnmodifiable(final Collection> collection) {
+ try {
+ collection.clear();
+ return false;
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
index 98022bbdbaa..d6862ce008c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
@@ -18,39 +18,121 @@
package org.apache.kafka.streams.state;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
public class StreamsMetadataTest {
private static final HostInfo HOST_INFO = new HostInfo("local", 12);
+ public static final Set STATE_STORE_NAMES = mkSet("store1", "store2");
private static final TopicPartition TP_0 = new TopicPartition("t", 0);
private static final TopicPartition TP_1 = new TopicPartition("t", 1);
+ public static final Set TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+ public static final Set STAND_BY_STORE_NAMES = mkSet("store2");
+ public static final Set STANDBY_TOPIC_PARTITIONS = mkSet(TP_1);
private StreamsMetadata streamsMetadata;
@Before
public void setUp() {
- streamsMetadata = new StreamsMetadata(
+ streamsMetadata = new StreamsMetadataImpl(
HOST_INFO,
- mkSet("store1", "store2"),
- mkSet(TP_0, TP_1),
- mkSet("store2"),
- mkSet(TP_1)
+ STATE_STORE_NAMES,
+ TOPIC_PARTITIONS,
+ STAND_BY_STORE_NAMES,
+ STANDBY_TOPIC_PARTITIONS
);
}
@Test
public void shouldNotAllowModificationOfInternalStateViaGetters() {
- assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames()));
- assertTrue(isUnmodifiable(streamsMetadata.topicPartitions()));
- assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions()));
- assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
+ assertThat(isUnmodifiable(streamsMetadata.stateStoreNames()), is(true));
+ assertThat(isUnmodifiable(streamsMetadata.topicPartitions()), is(true));
+ assertThat(isUnmodifiable(streamsMetadata.standbyTopicPartitions()), is(true));
+ assertThat(isUnmodifiable(streamsMetadata.standbyStateStoreNames()), is(true));
+ }
+
+ @Test
+ public void shouldBeEqualsIfSameObject() {
+ final StreamsMetadata same = new StreamsMetadataImpl(
+ HOST_INFO,
+ STATE_STORE_NAMES,
+ TOPIC_PARTITIONS,
+ STAND_BY_STORE_NAMES,
+ STANDBY_TOPIC_PARTITIONS);
+ assertThat(streamsMetadata, equalTo(same));
+ assertThat(streamsMetadata.hashCode(), equalTo(same.hashCode()));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInHostInfo() {
+ final StreamsMetadata differHostInfo = new StreamsMetadataImpl(
+ new HostInfo("different", 122),
+ STATE_STORE_NAMES,
+ TOPIC_PARTITIONS,
+ STAND_BY_STORE_NAMES,
+ STANDBY_TOPIC_PARTITIONS);
+ assertThat(streamsMetadata, not(equalTo(differHostInfo)));
+ assertThat(streamsMetadata.hashCode(), not(equalTo(differHostInfo.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferStateStoreNames() {
+ final StreamsMetadata differStateStoreNames = new StreamsMetadataImpl(
+ HOST_INFO,
+ mkSet("store1"),
+ TOPIC_PARTITIONS,
+ STAND_BY_STORE_NAMES,
+ STANDBY_TOPIC_PARTITIONS);
+ assertThat(streamsMetadata, not(equalTo(differStateStoreNames)));
+ assertThat(streamsMetadata.hashCode(), not(equalTo(differStateStoreNames.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInTopicPartitions() {
+ final StreamsMetadata differTopicPartitions = new StreamsMetadataImpl(
+ HOST_INFO,
+ STATE_STORE_NAMES,
+ mkSet(TP_0),
+ STAND_BY_STORE_NAMES,
+ STANDBY_TOPIC_PARTITIONS);
+ assertThat(streamsMetadata, not(equalTo(differTopicPartitions)));
+ assertThat(streamsMetadata.hashCode(), not(equalTo(differTopicPartitions.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInStandByStores() {
+ final StreamsMetadata differStandByStores = new StreamsMetadataImpl(
+ HOST_INFO,
+ STATE_STORE_NAMES,
+ TOPIC_PARTITIONS,
+ mkSet("store1"),
+ STANDBY_TOPIC_PARTITIONS);
+ assertThat(streamsMetadata, not(equalTo(differStandByStores)));
+ assertThat(streamsMetadata.hashCode(), not(equalTo(differStandByStores.hashCode())));
+ }
+
+ @Test
+ public void shouldNotBeEqualIfDifferInStandByTopicPartitions() {
+ final StreamsMetadata differStandByTopicPartitions = new StreamsMetadataImpl(
+ HOST_INFO,
+ STATE_STORE_NAMES,
+ TOPIC_PARTITIONS,
+ STAND_BY_STORE_NAMES,
+ mkSet(TP_0));
+ assertThat(streamsMetadata, not(equalTo(differStandByTopicPartitions)));
+ assertThat(streamsMetadata.hashCode(), not(equalTo(differStandByTopicPartitions.hashCode())));
}
private static boolean isUnmodifiable(final Collection> collection) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index a7e72e3fbf3..3c693cc7135 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -26,13 +26,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
@@ -137,7 +137,7 @@ public class StreamsStandByReplicaTest {
streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
- final Set threadMetadata = streams.localThreadsMetadata();
+ final Set threadMetadata = streams.metadataForLocalThreads();
for (final ThreadMetadata threadMetadatum : threadMetadata) {
System.out.println(
"ACTIVE_TASKS:" + threadMetadatum.activeTasks().size()
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
index 19e81acd8a2..6d7da29e3ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -24,9 +24,9 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.TaskMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
@@ -82,7 +82,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
- final Set allThreadMetadata = streams.localThreadsMetadata();
+ final Set allThreadMetadata = streams.metadataForLocalThreads();
final StringBuilder taskReportBuilder = new StringBuilder();
final List activeTasks = new ArrayList<>();
final List standbyTasks = new ArrayList<>();
@@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
streams.close();
- System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+ System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
System.out.flush();
});
}