mirror of https://github.com/apache/kafka.git
KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API (#10840)
Implementation of KIP-744. Creates new Interfaces for TaskMetadata, ThreadMetadata, and StreamsMetadata, providing internal implementations for each of them. Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor, and SreamsMetadata under a.o.k.s.state. Updates references on internal classes from deprecated classes to new interfaces. Deprecates methods on KafkaStreams returning deprecated ThreadMeatada and StreamsMetadata, and provides new ones returning the new interfaces. Update Javadocs referencing to deprecated classes and methods to point to the right ones. Co-authored-by: Bruno Cadonna <cadonna@apache.org> Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
bd72ef1bf1
commit
6655a09e99
|
@ -121,10 +121,23 @@
|
|||
<p>
|
||||
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
|
||||
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
|
||||
and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
|
||||
These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
|
||||
method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
|
||||
and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and introduced a new interface
|
||||
<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
|
||||
of Kafka codebase.
|
||||
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
|
||||
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
|
||||
<code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
|
||||
interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
|
||||
Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
|
||||
We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
|
||||
</p>
|
||||
<ul>
|
||||
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForAllStreamsClients</code>.</li>
|
||||
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#streamsMetadataForStore(String)</code>.</li>
|
||||
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForLocalThreads</code>.</li>
|
||||
</ul>
|
||||
<p>See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> and <a href="https://cwiki.apache.org/confluence/x/XIrOCg">KIP-744</a> for more details.</p>
|
||||
|
||||
<p>
|
||||
We removed the following deprecated APIs:
|
||||
</p>
|
||||
|
|
|
@ -51,8 +51,6 @@ import org.apache.kafka.streams.internals.metrics.ClientMetrics;
|
|||
import org.apache.kafka.streams.processor.StateRestoreListener;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.internals.ClientUtils;
|
||||
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
|
||||
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
|
||||
|
@ -66,7 +64,6 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
|
|||
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.StreamsMetadata;
|
||||
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
|
||||
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
|
||||
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
|
||||
|
@ -95,6 +92,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
|
||||
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
|
||||
|
@ -1449,8 +1447,30 @@ public class KafkaStreams implements AutoCloseable {
|
|||
* Note: this is a point in time view and it may change due to partition reassignment.
|
||||
*
|
||||
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
|
||||
* @deprecated since 3.0.0 use {@link KafkaStreams#metadataForAllStreamsClients}
|
||||
*/
|
||||
public Collection<StreamsMetadata> allMetadata() {
|
||||
@Deprecated
|
||||
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
|
||||
validateIsRunningOrRebalancing();
|
||||
return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
|
||||
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
|
||||
streamsMetadata.stateStoreNames(),
|
||||
streamsMetadata.topicPartitions(),
|
||||
streamsMetadata.standbyStateStoreNames(),
|
||||
streamsMetadata.standbyTopicPartitions()))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
|
||||
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
|
||||
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
|
||||
* <p>
|
||||
* Note: this is a point in time view and it may change due to partition reassignment.
|
||||
*
|
||||
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
|
||||
*/
|
||||
public Collection<StreamsMetadata> metadataForAllStreamsClients() {
|
||||
validateIsRunningOrRebalancing();
|
||||
return streamsMetadataState.getAllMetadata();
|
||||
}
|
||||
|
@ -1469,8 +1489,36 @@ public class KafkaStreams implements AutoCloseable {
|
|||
* @param storeName the {@code storeName} to find metadata for
|
||||
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
|
||||
* this application
|
||||
* @deprecated since 3.0.0 use {@link KafkaStreams#streamsMetadataForStore} instead
|
||||
*/
|
||||
public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
|
||||
@Deprecated
|
||||
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
|
||||
validateIsRunningOrRebalancing();
|
||||
return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
|
||||
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
|
||||
streamsMetadata.stateStoreNames(),
|
||||
streamsMetadata.topicPartitions(),
|
||||
streamsMetadata.standbyStateStoreNames(),
|
||||
streamsMetadata.standbyTopicPartitions()))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that
|
||||
* <ul>
|
||||
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
|
||||
* instances that belong to the same Kafka Streams application)</li>
|
||||
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
|
||||
* </ul>
|
||||
* and return {@link StreamsMetadata} for each discovered instance.
|
||||
* <p>
|
||||
* Note: this is a point in time view and it may change due to partition reassignment.
|
||||
*
|
||||
* @param storeName the {@code storeName} to find metadata for
|
||||
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
|
||||
* this application
|
||||
*/
|
||||
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
|
||||
validateIsRunningOrRebalancing();
|
||||
return streamsMetadataState.getAllMetadataForStore(storeName);
|
||||
}
|
||||
|
@ -1549,12 +1597,45 @@ public class KafkaStreams implements AutoCloseable {
|
|||
for (final StreamThread thread : copy) consumer.accept(thread);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
|
||||
*
|
||||
* @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
|
||||
* @deprecated since 3.0 use {@link #metadataForLocalThreads()}
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings("deprecation")
|
||||
public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
|
||||
return metadataForLocalThreads().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
|
||||
threadMetadata.threadName(),
|
||||
threadMetadata.threadState(),
|
||||
threadMetadata.consumerClientId(),
|
||||
threadMetadata.restoreConsumerClientId(),
|
||||
threadMetadata.producerClientIds(),
|
||||
threadMetadata.adminClientId(),
|
||||
threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
|
||||
taskMetadata.taskId().toString(),
|
||||
taskMetadata.topicPartitions(),
|
||||
taskMetadata.committedOffsets(),
|
||||
taskMetadata.endOffsets(),
|
||||
taskMetadata.timeCurrentIdlingStarted())
|
||||
).collect(Collectors.toSet()),
|
||||
threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
|
||||
taskMetadata.taskId().toString(),
|
||||
taskMetadata.topicPartitions(),
|
||||
taskMetadata.committedOffsets(),
|
||||
taskMetadata.endOffsets(),
|
||||
taskMetadata.timeCurrentIdlingStarted())
|
||||
).collect(Collectors.toSet())))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
|
||||
*
|
||||
* @return the set of {@link ThreadMetadata}.
|
||||
*/
|
||||
public Set<ThreadMetadata> localThreadsMetadata() {
|
||||
public Set<ThreadMetadata> metadataForLocalThreads() {
|
||||
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
|
||||
processStreamThread(thread -> {
|
||||
synchronized (thread.getStateLock()) {
|
||||
|
|
|
@ -236,7 +236,7 @@ public class StreamsBuilder {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* @param topic the topic name; cannot be {@code null}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Metadata of a Kafka Streams client.
|
||||
*/
|
||||
public interface StreamsMetadata {
|
||||
|
||||
/**
|
||||
* The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the Streams
|
||||
* client.
|
||||
*
|
||||
* @return {@link HostInfo} corresponding to the Streams client
|
||||
*/
|
||||
HostInfo hostInfo();
|
||||
|
||||
/**
|
||||
* Names of the state stores assigned to active tasks of the Streams client.
|
||||
*
|
||||
* @return names of the state stores assigned to active tasks
|
||||
*/
|
||||
Set<String> stateStoreNames();
|
||||
|
||||
/**
|
||||
* Source topic partitions of the active tasks of the Streams client.
|
||||
*
|
||||
* @return source topic partitions of the active tasks
|
||||
*/
|
||||
Set<TopicPartition> topicPartitions();
|
||||
|
||||
/**
|
||||
* Changelog topic partitions for the state stores the standby tasks of the Streams client replicates.
|
||||
*
|
||||
* @return set of changelog topic partitions of the standby tasks
|
||||
*/
|
||||
Set<TopicPartition> standbyTopicPartitions();
|
||||
|
||||
/**
|
||||
* Names of the state stores assigned to standby tasks of the Streams client.
|
||||
*
|
||||
* @return names of the state stores assigned to standby tasks
|
||||
*/
|
||||
Set<String> standbyStateStoreNames();
|
||||
|
||||
/**
|
||||
* Host where the Streams client runs.
|
||||
*
|
||||
* This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
|
||||
*
|
||||
* @return the host where the Streams client runs
|
||||
*/
|
||||
String host();
|
||||
|
||||
/**
|
||||
* Port on which the Streams client listens.
|
||||
*
|
||||
* This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
|
||||
*
|
||||
* @return the port on which Streams client listens
|
||||
*/
|
||||
int port();
|
||||
|
||||
/**
|
||||
* Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
|
||||
* also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
|
||||
* {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
|
||||
*
|
||||
* @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
|
||||
*/
|
||||
boolean equals(Object o);
|
||||
|
||||
/**
|
||||
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
|
||||
* <pre>
|
||||
* {@code
|
||||
* Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @return a hash code value for this object.
|
||||
*/
|
||||
int hashCode();
|
||||
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
* Metadata of a task.
|
||||
*/
|
||||
public interface TaskMetadata {
|
||||
|
||||
/**
|
||||
* Task ID of the task.
|
||||
*
|
||||
* @return task ID consisting of subtopology and partition ID
|
||||
*/
|
||||
TaskId taskId();
|
||||
|
||||
/**
|
||||
* Source topic partitions of the task.
|
||||
*
|
||||
* @return source topic partitions
|
||||
*/
|
||||
Set<TopicPartition> topicPartitions();
|
||||
|
||||
/**
|
||||
* Offsets of the source topic partitions committed so far by the task.
|
||||
*
|
||||
* @return map from source topic partitions to committed offsets
|
||||
*/
|
||||
Map<TopicPartition, Long> committedOffsets();
|
||||
|
||||
/**
|
||||
* End offsets of the source topic partitions of the task.
|
||||
*
|
||||
* @return map source topic partition to end offsets
|
||||
*/
|
||||
Map<TopicPartition, Long> endOffsets();
|
||||
|
||||
/**
|
||||
* Time task idling started. If the task is not currently idling it will return empty.
|
||||
*
|
||||
* @return time when task idling started, empty {@code Optional} if the task is currently not idling
|
||||
*/
|
||||
Optional<Long> timeCurrentIdlingStarted();
|
||||
|
||||
/**
|
||||
* Compares the specified object with this TaskMetadata. Returns {@code true} if and only if the specified object is
|
||||
* also a TaskMetadata and both {@code taskId()} and {@code topicPartitions()} are equal.
|
||||
*
|
||||
* @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
|
||||
*/
|
||||
boolean equals(final Object o);
|
||||
|
||||
/**
|
||||
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
|
||||
* <pre>
|
||||
* {@code
|
||||
* Objects.hash(taskId(), topicPartitions());
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @return a hash code value for this object.
|
||||
*/
|
||||
int hashCode();
|
||||
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Metadata of a stream thread.
|
||||
*/
|
||||
public interface ThreadMetadata {
|
||||
|
||||
|
||||
/**
|
||||
* State of the stream thread
|
||||
*
|
||||
* @return the state
|
||||
*/
|
||||
String threadState();
|
||||
|
||||
/**
|
||||
* Name of the stream thread
|
||||
*
|
||||
* @return the name
|
||||
*/
|
||||
String threadName();
|
||||
|
||||
/**
|
||||
* Metadata of the active tasks assigned to the stream thread.
|
||||
*
|
||||
* @return metadata of the active tasks
|
||||
*/
|
||||
Set<TaskMetadata> activeTasks();
|
||||
|
||||
/**
|
||||
* Metadata of the standby tasks assigned to the stream thread.
|
||||
*
|
||||
* @return metadata of the standby tasks
|
||||
|
||||
*/
|
||||
Set<TaskMetadata> standbyTasks();
|
||||
|
||||
/**
|
||||
* Client ID of the Kafka consumer used by the stream thread.
|
||||
*
|
||||
* @return client ID of the Kafka consumer
|
||||
*/
|
||||
String consumerClientId();
|
||||
|
||||
/**
|
||||
* Client ID of the restore Kafka consumer used by the stream thread
|
||||
*
|
||||
* @return client ID of the restore Kafka consumer
|
||||
*/
|
||||
String restoreConsumerClientId();
|
||||
|
||||
/**
|
||||
* Client IDs of the Kafka producers used by the stream thread.
|
||||
*
|
||||
* @return client IDs of the Kafka producers
|
||||
*/
|
||||
Set<String> producerClientIds();
|
||||
|
||||
/**
|
||||
* Client ID of the admin client used by the stream thread.
|
||||
*
|
||||
* @return client ID of the admin client
|
||||
*/
|
||||
String adminClientId();
|
||||
|
||||
/**
|
||||
* Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified object is
|
||||
* also a ThreadMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same
|
||||
* elements, {@code standbyTasks()} contain the same elements, {@code mainConsumerClientId()} are equal, {@code restoreConsumerClientId()}
|
||||
* are equal, {@code producerClientIds()} are equal, {@code producerClientIds} contain the same elements, and {@code adminClientId()} are equal.
|
||||
*
|
||||
* @return {@code true} if this object is the same as the obj argument; {@code false} otherwise.
|
||||
*/
|
||||
boolean equals(Object o);
|
||||
|
||||
/**
|
||||
* Returns the hash code value for this ThreadMetadata. The hash code of a list is defined to be the result of the following calculation:
|
||||
* <pre>
|
||||
* {@code
|
||||
* Objects.hash(
|
||||
* threadName,
|
||||
* threadState,
|
||||
* activeTasks,
|
||||
* standbyTasks,
|
||||
* mainConsumerClientId,
|
||||
* restoreConsumerClientId,
|
||||
* producerClientIds,
|
||||
* adminClientId);
|
||||
* </pre>
|
||||
*
|
||||
* @return a hash code value for this object.
|
||||
*/
|
||||
int hashCode();
|
||||
}
|
|
@ -21,7 +21,7 @@ import org.apache.kafka.streams.KafkaStreams;
|
|||
/**
|
||||
* Indicates that the specific state store being queried via
|
||||
* {@link org.apache.kafka.streams.StoreQueryParameters} used a partitioning that is not assigned to this instance.
|
||||
* You can use {@link KafkaStreams#allMetadata()} to discover the correct instance that hosts the requested partition.
|
||||
* You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover the correct instance that hosts the requested partition.
|
||||
*/
|
||||
public class InvalidStateStorePartitionException extends InvalidStateStoreException {
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
|
||||
* the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
|
||||
|
@ -140,7 +140,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
|
||||
* the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
|
||||
|
@ -191,7 +191,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
|
||||
* the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what
|
||||
|
@ -244,7 +244,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VOut> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to query
|
||||
* the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what
|
||||
|
|
|
@ -124,7 +124,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -170,7 +170,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -274,7 +274,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -338,7 +338,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<V> reduceForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -443,7 +443,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -502,7 +502,7 @@ public interface KGroupedStream<K, V> {
|
|||
* K key = "some-key";
|
||||
* ValueAndTimestamp<VR> aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
|
|
@ -63,7 +63,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -106,7 +106,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
*
|
||||
* <p>
|
||||
|
@ -234,7 +234,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -307,7 +307,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -445,7 +445,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -529,7 +529,7 @@ public interface KGroupedTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
|
|
@ -138,7 +138,7 @@ public interface KTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -177,7 +177,7 @@ public interface KTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -263,7 +263,7 @@ public interface KTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -301,7 +301,7 @@ public interface KTable<K, V> {
|
|||
* K key = "some-word";
|
||||
* ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -466,7 +466,7 @@ public interface KTable<K, V> {
|
|||
* <p>
|
||||
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -512,7 +512,7 @@ public interface KTable<K, V> {
|
|||
* <p>
|
||||
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -560,7 +560,7 @@ public interface KTable<K, V> {
|
|||
* <p>
|
||||
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters)} KafkaStreams#store(...)}:
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
@ -607,7 +607,7 @@ public interface KTable<K, V> {
|
|||
* <p>
|
||||
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
|
||||
* {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}:
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
|
||||
* <p>
|
||||
|
|
|
@ -181,7 +181,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -238,7 +238,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
|
|
@ -138,7 +138,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -184,7 +184,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -342,7 +342,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -403,7 +403,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -561,7 +561,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -621,7 +621,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* String key = "some-key";
|
||||
* KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
|
|
@ -171,7 +171,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -228,7 +228,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<V>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
|
|
@ -141,7 +141,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -190,7 +190,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<Long>> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -341,7 +341,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -402,7 +402,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<VR>> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -562,7 +562,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
@ -625,7 +625,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* long toTime = ...;
|
||||
* WindowStoreIterator<ValueAndTimestamp<V>> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
|
||||
* }</pre>
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
|
||||
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#metadataForAllStreamsClients()} to
|
||||
* query the value of the key on a parallel running instance of your Kafka Streams application.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore} -- regardless of what
|
||||
|
|
|
@ -27,10 +27,12 @@ import java.util.Set;
|
|||
|
||||
/**
|
||||
* Represents the state of a single task running within a {@link KafkaStreams} application.
|
||||
* @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public class TaskMetadata {
|
||||
|
||||
private final TaskId taskId;
|
||||
private final String taskId;
|
||||
|
||||
private final Set<TopicPartition> topicPartitions;
|
||||
|
||||
|
@ -40,24 +42,11 @@ public class TaskMetadata {
|
|||
|
||||
private final Optional<Long> timeCurrentIdlingStarted;
|
||||
|
||||
/**
|
||||
* @deprecated since 3.0, not intended for public use
|
||||
*/
|
||||
@Deprecated
|
||||
public TaskMetadata(final String taskId,
|
||||
final Set<TopicPartition> topicPartitions,
|
||||
final Map<TopicPartition, Long> committedOffsets,
|
||||
final Map<TopicPartition, Long> endOffsets,
|
||||
final Optional<Long> timeCurrentIdlingStarted) {
|
||||
this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted);
|
||||
}
|
||||
|
||||
// For internal use -- not a public API
|
||||
public TaskMetadata(final TaskId taskId,
|
||||
final Set<TopicPartition> topicPartitions,
|
||||
final Map<TopicPartition, Long> committedOffsets,
|
||||
final Map<TopicPartition, Long> endOffsets,
|
||||
final Optional<Long> timeCurrentIdlingStarted) {
|
||||
this.taskId = taskId;
|
||||
this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
|
||||
this.committedOffsets = Collections.unmodifiableMap(committedOffsets);
|
||||
|
@ -68,17 +57,8 @@ public class TaskMetadata {
|
|||
/**
|
||||
* @return the basic task metadata such as subtopology and partition id
|
||||
*/
|
||||
public TaskId getTaskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the basic task metadata such as subtopology and partition id
|
||||
* @deprecated please use {@link #getTaskId()} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public String taskId() {
|
||||
return taskId.toString();
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public Set<TopicPartition> topicPartitions() {
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.util.Set;
|
|||
|
||||
/**
|
||||
* Represents the state of a single thread running within a {@link KafkaStreams} application.
|
||||
* @deprecated since 3.0 use {@link org.apache.kafka.streams.ThreadMetadata} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public class ThreadMetadata {
|
||||
|
||||
private final String threadName;
|
||||
|
@ -51,8 +53,8 @@ public class ThreadMetadata {
|
|||
final String restoreConsumerClientId,
|
||||
final Set<String> producerClientIds,
|
||||
final String adminClientId,
|
||||
final Set<TaskMetadata> activeTasks,
|
||||
final Set<TaskMetadata> standbyTasks) {
|
||||
final Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks,
|
||||
final Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks) {
|
||||
this.mainConsumerClientId = mainConsumerClientId;
|
||||
this.restoreConsumerClientId = restoreConsumerClientId;
|
||||
this.producerClientIds = producerClientIds;
|
||||
|
@ -71,11 +73,11 @@ public class ThreadMetadata {
|
|||
return threadName;
|
||||
}
|
||||
|
||||
public Set<TaskMetadata> activeTasks() {
|
||||
public Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks() {
|
||||
return activeTasks;
|
||||
}
|
||||
|
||||
public Set<TaskMetadata> standbyTasks() {
|
||||
public Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks() {
|
||||
return standbyTasks;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,14 +34,14 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.streams.KafkaClientSupplier;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||
import org.apache.kafka.streams.errors.TaskMigratedException;
|
||||
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
|
||||
import org.apache.kafka.streams.processor.StateRestoreListener;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
|
@ -1118,7 +1118,7 @@ public class StreamThread extends Thread {
|
|||
// package-private for testing only
|
||||
StreamThread updateThreadMetadata(final String adminClientId) {
|
||||
|
||||
threadMetadata = new ThreadMetadata(
|
||||
threadMetadata = new ThreadMetadataImpl(
|
||||
getName(),
|
||||
state().name(),
|
||||
getConsumerClientId(getName()),
|
||||
|
@ -1135,7 +1135,7 @@ public class StreamThread extends Thread {
|
|||
final Map<TaskId, Task> standbyTasks) {
|
||||
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
|
||||
for (final Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
|
||||
activeTasksMetadata.add(new TaskMetadata(
|
||||
activeTasksMetadata.add(new TaskMetadataImpl(
|
||||
task.getValue().id(),
|
||||
task.getValue().inputPartitions(),
|
||||
task.getValue().committedOffsets(),
|
||||
|
@ -1145,7 +1145,7 @@ public class StreamThread extends Thread {
|
|||
}
|
||||
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
|
||||
for (final Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
|
||||
standbyTasksMetadata.add(new TaskMetadata(
|
||||
standbyTasksMetadata.add(new TaskMetadataImpl(
|
||||
task.getValue().id(),
|
||||
task.getValue().inputPartitions(),
|
||||
task.getValue().committedOffsets(),
|
||||
|
@ -1155,7 +1155,7 @@ public class StreamThread extends Thread {
|
|||
}
|
||||
|
||||
final String adminClientId = threadMetadata.adminClientId();
|
||||
threadMetadata = new ThreadMetadata(
|
||||
threadMetadata = new ThreadMetadataImpl(
|
||||
getName(),
|
||||
state().name(),
|
||||
getConsumerClientId(getName()),
|
||||
|
|
|
@ -27,7 +27,8 @@ import org.apache.kafka.streams.KeyQueryMetadata;
|
|||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.StreamsMetadata;
|
||||
import org.apache.kafka.streams.StreamsMetadata;
|
||||
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -92,7 +93,7 @@ public class StreamsMetadataState {
|
|||
public Collection<StreamsMetadata> getAllMetadata() {
|
||||
return Collections.unmodifiableList(allMetadata);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find all of the {@link StreamsMetadata}s for a given storeName
|
||||
*
|
||||
|
@ -229,11 +230,12 @@ public class StreamsMetadataState {
|
|||
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
|
||||
if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
|
||||
allMetadata = Collections.emptyList();
|
||||
localMetadata.set(new StreamsMetadata(thisHost,
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet()
|
||||
localMetadata.set(new StreamsMetadataImpl(
|
||||
thisHost,
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet()
|
||||
));
|
||||
return;
|
||||
}
|
||||
|
@ -258,11 +260,12 @@ public class StreamsMetadataState {
|
|||
standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
|
||||
}
|
||||
|
||||
final StreamsMetadata metadata = new StreamsMetadata(hostInfo,
|
||||
activeStoresOnHost,
|
||||
activePartitionsOnHost,
|
||||
standbyStoresOnHost,
|
||||
standbyPartitionsOnHost);
|
||||
final StreamsMetadata metadata = new StreamsMetadataImpl(
|
||||
hostInfo,
|
||||
activeStoresOnHost,
|
||||
activePartitionsOnHost,
|
||||
standbyStoresOnHost,
|
||||
standbyPartitionsOnHost);
|
||||
rebuiltMetadata.add(metadata);
|
||||
if (hostInfo.equals(thisHost)) {
|
||||
localMetadata.set(metadata);
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class TaskMetadataImpl implements TaskMetadata {
|
||||
|
||||
private final TaskId taskId;
|
||||
|
||||
private final Set<TopicPartition> topicPartitions;
|
||||
|
||||
private final Map<TopicPartition, Long> committedOffsets;
|
||||
|
||||
private final Map<TopicPartition, Long> endOffsets;
|
||||
|
||||
private final Optional<Long> timeCurrentIdlingStarted;
|
||||
|
||||
public TaskMetadataImpl(final TaskId taskId,
|
||||
final Set<TopicPartition> topicPartitions,
|
||||
final Map<TopicPartition, Long> committedOffsets,
|
||||
final Map<TopicPartition, Long> endOffsets,
|
||||
final Optional<Long> timeCurrentIdlingStarted) {
|
||||
this.taskId = taskId;
|
||||
this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
|
||||
this.committedOffsets = Collections.unmodifiableMap(committedOffsets);
|
||||
this.endOffsets = Collections.unmodifiableMap(endOffsets);
|
||||
this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskId taskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TopicPartition> topicPartitions() {
|
||||
return topicPartitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TopicPartition, Long> committedOffsets() {
|
||||
return committedOffsets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TopicPartition, Long> endOffsets() {
|
||||
return endOffsets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Long> timeCurrentIdlingStarted() {
|
||||
return timeCurrentIdlingStarted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final TaskMetadataImpl that = (TaskMetadataImpl) o;
|
||||
return Objects.equals(taskId, that.taskId) &&
|
||||
Objects.equals(topicPartitions, that.topicPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(taskId, topicPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TaskMetadata{" +
|
||||
"taskId=" + taskId +
|
||||
", topicPartitions=" + topicPartitions +
|
||||
", committedOffsets=" + committedOffsets +
|
||||
", endOffsets=" + endOffsets +
|
||||
", timeCurrentIdlingStarted=" + timeCurrentIdlingStarted +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents the state of a single thread running within a {@link KafkaStreams} application.
|
||||
*/
|
||||
public class ThreadMetadataImpl implements ThreadMetadata {
|
||||
|
||||
private final String threadName;
|
||||
|
||||
private final String threadState;
|
||||
|
||||
private final Set<TaskMetadata> activeTasks;
|
||||
|
||||
private final Set<TaskMetadata> standbyTasks;
|
||||
|
||||
private final String mainConsumerClientId;
|
||||
|
||||
private final String restoreConsumerClientId;
|
||||
|
||||
private final Set<String> producerClientIds;
|
||||
|
||||
// the admin client should be shared among all threads, so the client id should be the same;
|
||||
// we keep it at the thread-level for user's convenience and possible extensions in the future
|
||||
private final String adminClientId;
|
||||
|
||||
public ThreadMetadataImpl(final String threadName,
|
||||
final String threadState,
|
||||
final String mainConsumerClientId,
|
||||
final String restoreConsumerClientId,
|
||||
final Set<String> producerClientIds,
|
||||
final String adminClientId,
|
||||
final Set<TaskMetadata> activeTasks,
|
||||
final Set<TaskMetadata> standbyTasks) {
|
||||
this.mainConsumerClientId = mainConsumerClientId;
|
||||
this.restoreConsumerClientId = restoreConsumerClientId;
|
||||
this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
|
||||
this.adminClientId = adminClientId;
|
||||
this.threadName = threadName;
|
||||
this.threadState = threadState;
|
||||
this.activeTasks = Collections.unmodifiableSet(activeTasks);
|
||||
this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
|
||||
}
|
||||
|
||||
|
||||
public String threadState() {
|
||||
return threadState;
|
||||
}
|
||||
|
||||
public String threadName() {
|
||||
return threadName;
|
||||
}
|
||||
|
||||
|
||||
public Set<TaskMetadata> activeTasks() {
|
||||
return activeTasks;
|
||||
}
|
||||
|
||||
public Set<TaskMetadata> standbyTasks() {
|
||||
return standbyTasks;
|
||||
}
|
||||
|
||||
public String consumerClientId() {
|
||||
return mainConsumerClientId;
|
||||
}
|
||||
|
||||
public String restoreConsumerClientId() {
|
||||
return restoreConsumerClientId;
|
||||
}
|
||||
|
||||
public Set<String> producerClientIds() {
|
||||
return producerClientIds;
|
||||
}
|
||||
|
||||
public String adminClientId() {
|
||||
return adminClientId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final ThreadMetadataImpl that = (ThreadMetadataImpl) o;
|
||||
return Objects.equals(threadName, that.threadName) &&
|
||||
Objects.equals(threadState, that.threadState) &&
|
||||
Objects.equals(activeTasks, that.activeTasks) &&
|
||||
Objects.equals(standbyTasks, that.standbyTasks) &&
|
||||
mainConsumerClientId.equals(that.mainConsumerClientId) &&
|
||||
restoreConsumerClientId.equals(that.restoreConsumerClientId) &&
|
||||
Objects.equals(producerClientIds, that.producerClientIds) &&
|
||||
adminClientId.equals(that.adminClientId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(
|
||||
threadName,
|
||||
threadState,
|
||||
activeTasks,
|
||||
standbyTasks,
|
||||
mainConsumerClientId,
|
||||
restoreConsumerClientId,
|
||||
producerClientIds,
|
||||
adminClientId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ThreadMetadata{" +
|
||||
"threadName=" + threadName +
|
||||
", threadState=" + threadState +
|
||||
", activeTasks=" + activeTasks +
|
||||
", standbyTasks=" + standbyTasks +
|
||||
", consumerClientId=" + mainConsumerClientId +
|
||||
", restoreConsumerClientId=" + restoreConsumerClientId +
|
||||
", producerClientIds=" + producerClientIds +
|
||||
", adminClientId=" + adminClientId +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
|||
/**
|
||||
* Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
|
||||
* Instances of this class can be obtained by calling one of:
|
||||
* {@link KafkaStreams#allMetadata()}
|
||||
* {@link KafkaStreams#allMetadataForStore(String)}
|
||||
* {@link KafkaStreams#metadataForAllStreamsClients()}
|
||||
* {@link KafkaStreams#streamsMetadataForStore(String)}
|
||||
*
|
||||
* The HostInfo is constructed during Partition Assignment
|
||||
* see {@link StreamsPartitionAssignor}
|
||||
|
|
|
@ -29,7 +29,9 @@ import java.util.Set;
|
|||
* APIs and services to connect to other instances, the Set of state stores available on
|
||||
* the instance and the Set of {@link TopicPartition}s available on the instance.
|
||||
* NOTE: This is a point in time view. It may change when rebalances happen.
|
||||
* @deprecated since 3.0.0 use {@link org.apache.kafka.streams.StreamsMetadata}
|
||||
*/
|
||||
@Deprecated
|
||||
public class StreamsMetadata {
|
||||
/**
|
||||
* Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.state.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsMetadata;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents the state of an instance (process) in a {@link KafkaStreams} application.
|
||||
* It contains the user supplied {@link HostInfo} that can be used by developers to build
|
||||
* APIs and services to connect to other instances, the Set of state stores available on
|
||||
* the instance and the Set of {@link TopicPartition}s available on the instance.
|
||||
* NOTE: This is a point in time view. It may change when rebalances happen.
|
||||
*/
|
||||
public class StreamsMetadataImpl implements StreamsMetadata {
|
||||
/**
|
||||
* Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
|
||||
* operations.
|
||||
*/
|
||||
public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(
|
||||
HostInfo.unavailable(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet());
|
||||
|
||||
private final HostInfo hostInfo;
|
||||
|
||||
private final Set<String> stateStoreNames;
|
||||
|
||||
private final Set<TopicPartition> topicPartitions;
|
||||
|
||||
private final Set<String> standbyStateStoreNames;
|
||||
|
||||
private final Set<TopicPartition> standbyTopicPartitions;
|
||||
|
||||
public StreamsMetadataImpl(final HostInfo hostInfo,
|
||||
final Set<String> stateStoreNames,
|
||||
final Set<TopicPartition> topicPartitions,
|
||||
final Set<String> standbyStateStoreNames,
|
||||
final Set<TopicPartition> standbyTopicPartitions) {
|
||||
|
||||
this.hostInfo = hostInfo;
|
||||
this.stateStoreNames = Collections.unmodifiableSet(stateStoreNames);
|
||||
this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
|
||||
this.standbyTopicPartitions = Collections.unmodifiableSet(standbyTopicPartitions);
|
||||
this.standbyStateStoreNames = Collections.unmodifiableSet(standbyStateStoreNames);
|
||||
}
|
||||
|
||||
/**
|
||||
* The value of {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
|
||||
* instance, which is typically host/port
|
||||
*
|
||||
* @return {@link HostInfo} corresponding to the streams instance
|
||||
*/
|
||||
@Override
|
||||
public HostInfo hostInfo() {
|
||||
return hostInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* State stores owned by the instance as an active replica
|
||||
*
|
||||
* @return set of active state store names
|
||||
*/
|
||||
@Override
|
||||
public Set<String> stateStoreNames() {
|
||||
return stateStoreNames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Topic partitions consumed by the instance as an active replica
|
||||
*
|
||||
* @return set of active topic partitions
|
||||
*/
|
||||
@Override
|
||||
public Set<TopicPartition> topicPartitions() {
|
||||
return topicPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* (Source) Topic partitions for which the instance acts as standby.
|
||||
*
|
||||
* @return set of standby topic partitions
|
||||
*/
|
||||
@Override
|
||||
public Set<TopicPartition> standbyTopicPartitions() {
|
||||
return standbyTopicPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* State stores owned by the instance as a standby replica
|
||||
*
|
||||
* @return set of standby state store names
|
||||
*/
|
||||
@Override
|
||||
public Set<String> standbyStateStoreNames() {
|
||||
return standbyStateStoreNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String host() {
|
||||
return hostInfo.host();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Override
|
||||
public int port() {
|
||||
return hostInfo.port();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final StreamsMetadataImpl that = (StreamsMetadataImpl) o;
|
||||
return Objects.equals(hostInfo, that.hostInfo)
|
||||
&& Objects.equals(stateStoreNames, that.stateStoreNames)
|
||||
&& Objects.equals(topicPartitions, that.topicPartitions)
|
||||
&& Objects.equals(standbyStateStoreNames, that.standbyStateStoreNames)
|
||||
&& Objects.equals(standbyTopicPartitions, that.standbyTopicPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(hostInfo, stateStoreNames, topicPartitions, standbyStateStoreNames, standbyTopicPartitions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StreamsMetadata {" +
|
||||
"hostInfo=" + hostInfo +
|
||||
", stateStoreNames=" + stateStoreNames +
|
||||
", topicPartitions=" + topicPartitions +
|
||||
", standbyStateStoreNames=" + standbyStateStoreNames +
|
||||
", standbyTopicPartitions=" + standbyTopicPartitions +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -42,7 +42,6 @@ import org.apache.kafka.streams.errors.UnknownStateStoreException;
|
|||
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.processor.StateRestoreListener;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
|
@ -52,6 +51,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
|||
import org.apache.kafka.streams.processor.internals.StateDirectory;
|
||||
import org.apache.kafka.streams.processor.internals.StreamThread;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
|
||||
import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
|
@ -140,8 +140,6 @@ public class KafkaStreamsTest {
|
|||
private GlobalStreamThread globalStreamThread;
|
||||
@Mock
|
||||
private Metrics metrics;
|
||||
@Mock
|
||||
private ThreadMetadata threadMetadata;
|
||||
|
||||
private StateListenerStub streamsStateListener;
|
||||
private Capture<List<MetricsReporter>> metricsReportersCapture;
|
||||
|
@ -329,7 +327,7 @@ public class KafkaStreamsTest {
|
|||
return null;
|
||||
}).anyTimes();
|
||||
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
|
||||
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
|
||||
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadataImpl(
|
||||
"processId-StreamThread-" + threadId,
|
||||
"DEAD",
|
||||
"",
|
||||
|
@ -500,7 +498,7 @@ public class KafkaStreamsTest {
|
|||
streams.threads.get(i).join();
|
||||
}
|
||||
waitForCondition(
|
||||
() -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
|
||||
() -> streams.metadataForLocalThreads().stream().allMatch(t -> t.threadState().equals("DEAD")),
|
||||
"Streams never stopped"
|
||||
);
|
||||
streams.close();
|
||||
|
@ -619,7 +617,7 @@ public class KafkaStreamsTest {
|
|||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||
streams.start();
|
||||
streamThreadOne.shutdown();
|
||||
final Set<ThreadMetadata> threads = streams.localThreadsMetadata();
|
||||
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
|
||||
assertThat(threads.size(), equalTo(1));
|
||||
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
|
||||
}
|
||||
|
@ -756,24 +754,24 @@ public class KafkaStreamsTest {
|
|||
@Test
|
||||
public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
|
||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||
assertThrows(StreamsNotStartedException.class, streams::allMetadata);
|
||||
assertThrows(StreamsNotStartedException.class, streams::metadataForAllStreamsClients);
|
||||
streams.start();
|
||||
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
|
||||
streams.close();
|
||||
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
|
||||
assertThrows(IllegalStateException.class, streams::allMetadata);
|
||||
assertThrows(IllegalStateException.class, streams::metadataForAllStreamsClients);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
|
||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||
assertThrows(StreamsNotStartedException.class, () -> streams.allMetadataForStore("store"));
|
||||
assertThrows(StreamsNotStartedException.class, () -> streams.streamsMetadataForStore("store"));
|
||||
streams.start();
|
||||
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
|
||||
streams.close();
|
||||
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
|
||||
assertThrows(IllegalStateException.class, () -> streams.allMetadataForStore("store"));
|
||||
assertThrows(IllegalStateException.class, () -> streams.streamsMetadataForStore("store"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
|
@ -30,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream;
|
|||
import org.apache.kafka.streams.kstream.Transformer;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -164,22 +164,22 @@ public class AdjustStreamThreadCountTest {
|
|||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
|
||||
final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
|
||||
|
||||
stateTransitionHistory.clear();
|
||||
final Optional<String> name = kafkaStreams.addStreamThread();
|
||||
|
||||
assertThat(name, not(Optional.empty()));
|
||||
TestUtils.waitForCondition(
|
||||
() -> kafkaStreams.localThreadsMetadata().stream().sequential()
|
||||
() -> kafkaStreams.metadataForLocalThreads().stream().sequential()
|
||||
.map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
|
||||
"Wait for the thread to be added"
|
||||
);
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount + 1));
|
||||
assertThat(
|
||||
kafkaStreams
|
||||
.localThreadsMetadata()
|
||||
.metadataForLocalThreads()
|
||||
.stream()
|
||||
.map(t -> t.threadName().split("-StreamThread-")[1])
|
||||
.sorted().toArray(),
|
||||
|
@ -196,10 +196,10 @@ public class AdjustStreamThreadCountTest {
|
|||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
stateTransitionHistory.clear();
|
||||
assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
|
||||
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
}
|
||||
|
@ -212,10 +212,10 @@ public class AdjustStreamThreadCountTest {
|
|||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
stateTransitionHistory.clear();
|
||||
assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
|
||||
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ public class AdjustStreamThreadCountTest {
|
|||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
stateTransitionHistory.clear();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
|
@ -245,7 +245,7 @@ public class AdjustStreamThreadCountTest {
|
|||
two.start();
|
||||
one.start();
|
||||
latch.await(30, TimeUnit.SECONDS);
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount));
|
||||
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
}
|
||||
|
@ -267,11 +267,11 @@ public class AdjustStreamThreadCountTest {
|
|||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
stateTransitionHistory.clear();
|
||||
|
||||
assertThat(
|
||||
kafkaStreams.localThreadsMetadata()
|
||||
kafkaStreams.metadataForLocalThreads()
|
||||
.stream()
|
||||
.map(t -> t.threadName().split("-StreamThread-")[1])
|
||||
.sorted()
|
||||
|
@ -284,16 +284,16 @@ public class AdjustStreamThreadCountTest {
|
|||
assertThat("New thread has index 3", "3".equals(name.get().split("-StreamThread-")[1]));
|
||||
TestUtils.waitForCondition(
|
||||
() -> kafkaStreams
|
||||
.localThreadsMetadata()
|
||||
.metadataForLocalThreads()
|
||||
.stream().sequential()
|
||||
.map(ThreadMetadata::threadName)
|
||||
.anyMatch(t -> t.equals(name.get())),
|
||||
"Stream thread has not been added"
|
||||
);
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount + 1));
|
||||
assertThat(
|
||||
kafkaStreams
|
||||
.localThreadsMetadata()
|
||||
.metadataForLocalThreads()
|
||||
.stream()
|
||||
.map(t -> t.threadName().split("-StreamThread-")[1])
|
||||
.sorted()
|
||||
|
@ -302,13 +302,13 @@ public class AdjustStreamThreadCountTest {
|
|||
);
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
|
||||
oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
stateTransitionHistory.clear();
|
||||
|
||||
final Optional<String> removedThread = kafkaStreams.removeStreamThread();
|
||||
|
||||
assertThat(removedThread, not(Optional.empty()));
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount - 1));
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
|
||||
stateTransitionHistory.clear();
|
||||
|
@ -317,14 +317,14 @@ public class AdjustStreamThreadCountTest {
|
|||
|
||||
assertThat(name2, not(Optional.empty()));
|
||||
TestUtils.waitForCondition(
|
||||
() -> kafkaStreams.localThreadsMetadata().stream().sequential()
|
||||
() -> kafkaStreams.metadataForLocalThreads().stream().sequential()
|
||||
.map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))),
|
||||
"Wait for the thread to be added"
|
||||
);
|
||||
assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
|
||||
assertThat(kafkaStreams.metadataForLocalThreads().size(), equalTo(oldThreadCount));
|
||||
assertThat(
|
||||
kafkaStreams
|
||||
.localThreadsMetadata()
|
||||
.metadataForLocalThreads()
|
||||
.stream()
|
||||
.map(t -> t.threadName().split("-StreamThread-")[1])
|
||||
.sorted()
|
||||
|
@ -342,7 +342,7 @@ public class AdjustStreamThreadCountTest {
|
|||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
|
||||
addStreamStateChangeListener(kafkaStreams);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
|
||||
final int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
|
||||
final int threadCount = 5;
|
||||
final int loop = 3;
|
||||
final AtomicReference<Throwable> lastException = new AtomicReference<>();
|
||||
|
@ -353,7 +353,7 @@ public class AdjustStreamThreadCountTest {
|
|||
for (int i = 0; i < loop + 1; i++) {
|
||||
if (!kafkaStreams.addStreamThread().isPresent())
|
||||
throw new RuntimeException("failed to create stream thread");
|
||||
kafkaStreams.localThreadsMetadata();
|
||||
kafkaStreams.metadataForLocalThreads();
|
||||
if (!kafkaStreams.removeStreamThread().isPresent())
|
||||
throw new RuntimeException("failed to delete a stream thread");
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ public class AdjustStreamThreadCountTest {
|
|||
executor.shutdown();
|
||||
assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
|
||||
assertNull(lastException.get());
|
||||
assertEquals(oldThreadCount, kafkaStreams.localThreadsMetadata().size());
|
||||
assertEquals(oldThreadCount, kafkaStreams.metadataForLocalThreads().size());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -696,13 +696,13 @@ public class EosIntegrationTest {
|
|||
// the assignment is. We only really care that the remaining instance only sees one host
|
||||
// that owns both partitions.
|
||||
waitForCondition(
|
||||
() -> stallingInstance.allMetadata().size() == 2
|
||||
&& remainingInstance.allMetadata().size() == 1
|
||||
&& remainingInstance.allMetadata().iterator().next().topicPartitions().size() == 2,
|
||||
() -> stallingInstance.metadataForAllStreamsClients().size() == 2
|
||||
&& remainingInstance.metadataForAllStreamsClients().size() == 1
|
||||
&& remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
|
||||
MAX_WAIT_TIME_MS,
|
||||
() -> "Should have rebalanced.\n" +
|
||||
"Streams1[" + streams1.allMetadata() + "]\n" +
|
||||
"Streams2[" + streams2.allMetadata() + "]");
|
||||
"Streams1[" + streams1.metadataForAllStreamsClients() + "]\n" +
|
||||
"Streams2[" + streams2.metadataForAllStreamsClients() + "]");
|
||||
|
||||
// expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
|
||||
//
|
||||
|
@ -729,14 +729,14 @@ public class EosIntegrationTest {
|
|||
// It doesn't really matter what the assignment is, but we might as well also assert that they
|
||||
// both see both partitions assigned exactly once
|
||||
waitForCondition(
|
||||
() -> streams1.allMetadata().size() == 2
|
||||
&& streams2.allMetadata().size() == 2
|
||||
&& streams1.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2
|
||||
&& streams2.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2,
|
||||
() -> streams1.metadataForAllStreamsClients().size() == 2
|
||||
&& streams2.metadataForAllStreamsClients().size() == 2
|
||||
&& streams1.metadataForAllStreamsClients().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2
|
||||
&& streams2.metadataForAllStreamsClients().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2,
|
||||
MAX_WAIT_TIME_MS,
|
||||
() -> "Should have rebalanced.\n" +
|
||||
"Streams1[" + streams1.allMetadata() + "]\n" +
|
||||
"Streams2[" + streams2.allMetadata() + "]");
|
||||
"Streams1[" + streams1.metadataForAllStreamsClients() + "]\n" +
|
||||
"Streams2[" + streams2.metadataForAllStreamsClients() + "]");
|
||||
|
||||
writeInputData(dataAfterSecondRebalance);
|
||||
|
||||
|
|
|
@ -27,11 +27,11 @@ import org.apache.kafka.streams.KeyValue;
|
|||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -214,13 +214,13 @@ public class KTableKTableForeignKeyJoinDistributedTest {
|
|||
private void setStateListenersForVerification(final Predicate<ThreadMetadata> taskCondition) {
|
||||
client1.setStateListener((newState, oldState) -> {
|
||||
if (newState == KafkaStreams.State.RUNNING &&
|
||||
client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
|
||||
client1.metadataForLocalThreads().stream().allMatch(taskCondition)) {
|
||||
client1IsOk = true;
|
||||
}
|
||||
});
|
||||
client2.setStateListener((newState, oldState) -> {
|
||||
if (newState == KafkaStreams.State.RUNNING &&
|
||||
client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
|
||||
client2.metadataForLocalThreads().stream().allMatch(taskCondition)) {
|
||||
client2IsOk = true;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -22,13 +22,13 @@ import org.apache.kafka.streams.KafkaStreams.State;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Transformer;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
|
@ -167,14 +167,14 @@ public class StandbyTaskCreationIntegrationTest {
|
|||
private void setStateListenersForVerification(final Predicate<ThreadMetadata> taskCondition) {
|
||||
client1.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING &&
|
||||
client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
|
||||
client1.metadataForLocalThreads().stream().allMatch(taskCondition)) {
|
||||
|
||||
client1IsOk = true;
|
||||
}
|
||||
});
|
||||
client2.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING &&
|
||||
client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
|
||||
client2.metadataForLocalThreads().stream().allMatch(taskCondition)) {
|
||||
|
||||
client2IsOk = true;
|
||||
}
|
||||
|
|
|
@ -370,8 +370,8 @@ public class StoreQueryIntegrationTest {
|
|||
|
||||
startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
|
||||
|
||||
assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
|
||||
assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
|
||||
assertTrue(kafkaStreams1.metadataForLocalThreads().size() > 1);
|
||||
assertTrue(kafkaStreams2.metadataForLocalThreads().size() > 1);
|
||||
|
||||
produceValueRange(key, 0, batch1NumMessages);
|
||||
|
||||
|
|
|
@ -24,10 +24,10 @@ import org.apache.kafka.streams.KafkaStreams;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -158,7 +158,7 @@ public class TaskMetadataIntegrationTest {
|
|||
}
|
||||
|
||||
private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
|
||||
final List<TaskMetadata> taskMetadataList = kafkaStreams.localThreadsMetadata().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
|
||||
final List<TaskMetadata> taskMetadataList = kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
|
||||
assertThat("only one task", taskMetadataList.size() == 1);
|
||||
return taskMetadataList.get(0);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.kafka.common.utils.MockTime;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||
|
@ -63,8 +64,6 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
|
|||
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
|
@ -1115,6 +1114,7 @@ public class StreamThreadTest {
|
|||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
EasyMock.replay(consumerGroupMetadata);
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
taskManager.shutdown(true);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
@ -1153,7 +1153,7 @@ public class StreamThreadTest {
|
|||
@Test
|
||||
public void shouldNotReturnDataAfterTaskMigrated() {
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final InternalTopologyBuilder internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);
|
||||
|
||||
expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);
|
||||
|
@ -1221,6 +1221,7 @@ public class StreamThreadTest {
|
|||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
EasyMock.replay(consumerGroupMetadata);
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
taskManager.shutdown(true);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
@ -1258,6 +1259,7 @@ public class StreamThreadTest {
|
|||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
EasyMock.replay(consumerGroupMetadata);
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
taskManager.shutdown(true);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
@ -1641,7 +1643,7 @@ public class StreamThreadTest {
|
|||
|
||||
final ThreadMetadata metadata = thread.threadMetadata();
|
||||
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
|
||||
assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
|
||||
assertTrue(metadata.activeTasks().contains(new TaskMetadataImpl(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
|
||||
assertTrue(metadata.standbyTasks().isEmpty());
|
||||
|
||||
assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
|
||||
|
@ -1694,7 +1696,7 @@ public class StreamThreadTest {
|
|||
|
||||
final ThreadMetadata threadMetadata = thread.threadMetadata();
|
||||
assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
|
||||
assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
|
||||
assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadataImpl(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
|
||||
assertTrue(threadMetadata.activeTasks().isEmpty());
|
||||
|
||||
thread.taskManager().shutdown(true);
|
||||
|
@ -1974,12 +1976,6 @@ public class StreamThreadTest {
|
|||
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
|
||||
}
|
||||
|
||||
private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata, final StreamThread.State state) {
|
||||
assertEquals(state.name(), metadata.threadState());
|
||||
assertTrue(metadata.activeTasks().isEmpty());
|
||||
assertTrue(metadata.standbyTasks().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
|
||||
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
|
||||
|
@ -2163,6 +2159,7 @@ public class StreamThreadTest {
|
|||
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
|
||||
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
|
||||
consumer.assign(assignedPartitions);
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
|
||||
|
@ -2210,6 +2207,7 @@ public class StreamThreadTest {
|
|||
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
|
||||
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
|
||||
consumer.assign(assignedPartitions);
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
|
||||
|
@ -2256,7 +2254,7 @@ public class StreamThreadTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
|
||||
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
|
||||
consumer.subscribe((Collection<String>) anyObject(), anyObject());
|
||||
|
@ -2319,6 +2317,7 @@ public class StreamThreadTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
|
||||
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
|
||||
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
|
||||
|
@ -2386,6 +2385,7 @@ public class StreamThreadTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
|
||||
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
|
||||
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
|
||||
|
@ -2718,6 +2718,7 @@ public class StreamThreadTest {
|
|||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
EasyMock.replay(consumer, consumerGroupMetadata);
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
|
||||
final StreamsMetricsImpl streamsMetrics =
|
||||
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
|
||||
final StreamThread thread = new StreamThread(
|
||||
|
|
|
@ -24,13 +24,14 @@ import org.apache.kafka.common.serialization.Serdes;
|
|||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.streams.KeyQueryMetadata;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsMetadata;
|
||||
import org.apache.kafka.streams.TopologyWrapper;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.StreamsMetadata;
|
||||
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -139,17 +140,17 @@ public class StreamsMetadataStateTest {
|
|||
|
||||
@Test
|
||||
public void shouldGetAllStreamInstances() {
|
||||
final StreamsMetadata one = new StreamsMetadata(hostOne,
|
||||
final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
|
||||
mkSet(globalTable, "table-one", "table-two", "merged-table"),
|
||||
mkSet(topic1P0, topic2P1, topic4P0),
|
||||
mkSet("table-one", "table-two", "merged-table"),
|
||||
mkSet(topic2P0, topic1P1));
|
||||
final StreamsMetadata two = new StreamsMetadata(hostTwo,
|
||||
final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
|
||||
mkSet(globalTable, "table-two", "table-one", "merged-table"),
|
||||
mkSet(topic2P0, topic1P1),
|
||||
mkSet("table-three"),
|
||||
mkSet(topic3P0));
|
||||
final StreamsMetadata three = new StreamsMetadata(hostThree,
|
||||
final StreamsMetadata three = new StreamsMetadataImpl(hostThree,
|
||||
mkSet(globalTable, "table-three"),
|
||||
Collections.singleton(topic3P0),
|
||||
mkSet("table-one", "table-two", "merged-table"),
|
||||
|
@ -173,7 +174,7 @@ public class StreamsMetadataStateTest {
|
|||
metadataState.onChange(hostToActivePartitions, Collections.emptyMap(),
|
||||
cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
|
||||
|
||||
final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable),
|
||||
final StreamsMetadata expected = new StreamsMetadataImpl(hostFour, Collections.singleton(globalTable),
|
||||
Collections.singleton(tp5), Collections.emptySet(), Collections.emptySet());
|
||||
final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
|
||||
assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
|
||||
|
@ -181,12 +182,12 @@ public class StreamsMetadataStateTest {
|
|||
|
||||
@Test
|
||||
public void shouldGetInstancesForStoreName() {
|
||||
final StreamsMetadata one = new StreamsMetadata(hostOne,
|
||||
final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
|
||||
mkSet(globalTable, "table-one", "table-two", "merged-table"),
|
||||
mkSet(topic1P0, topic2P1, topic4P0),
|
||||
mkSet("table-one", "table-two", "merged-table"),
|
||||
mkSet(topic2P0, topic1P1));
|
||||
final StreamsMetadata two = new StreamsMetadata(hostTwo,
|
||||
final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
|
||||
mkSet(globalTable, "table-two", "table-one", "merged-table"),
|
||||
mkSet(topic2P0, topic1P1),
|
||||
mkSet("table-three"),
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class TaskMetadataImplTest {
|
||||
|
||||
public static final TaskId TASK_ID = new TaskId(1, 2);
|
||||
public static final TopicPartition TP_0 = new TopicPartition("t", 0);
|
||||
public static final TopicPartition TP_1 = new TopicPartition("t", 1);
|
||||
public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
|
||||
public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
|
||||
public static final Map<TopicPartition, Long> END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
|
||||
public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = Optional.of(3L);
|
||||
|
||||
private TaskMetadata taskMetadata;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
taskMetadata = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
TOPIC_PARTITIONS,
|
||||
COMMITTED_OFFSETS,
|
||||
END_OFFSETS,
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAllowModificationOfInternalStateViaGetters() {
|
||||
assertThat(isUnmodifiable(taskMetadata.topicPartitions()), is(true));
|
||||
assertThat(isUnmodifiable(taskMetadata.committedOffsets()), is(true));
|
||||
assertThat(isUnmodifiable(taskMetadata.endOffsets()), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualsIfSameObject() {
|
||||
final TaskMetadataImpl same = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
TOPIC_PARTITIONS,
|
||||
COMMITTED_OFFSETS,
|
||||
END_OFFSETS,
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
assertThat(taskMetadata, equalTo(same));
|
||||
assertThat(taskMetadata.hashCode(), equalTo(same.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualsIfOnlyDifferInCommittedOffsets() {
|
||||
final TaskMetadataImpl stillSameDifferCommittedOffsets = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
TOPIC_PARTITIONS,
|
||||
mkMap(mkEntry(TP_1, 1000000L), mkEntry(TP_1, 2L)),
|
||||
END_OFFSETS,
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
assertThat(taskMetadata, equalTo(stillSameDifferCommittedOffsets));
|
||||
assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferCommittedOffsets.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualsIfOnlyDifferInEndOffsets() {
|
||||
final TaskMetadataImpl stillSameDifferEndOffsets = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
TOPIC_PARTITIONS,
|
||||
COMMITTED_OFFSETS,
|
||||
mkMap(mkEntry(TP_1, 1000000L), mkEntry(TP_1, 2L)),
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
assertThat(taskMetadata, equalTo(stillSameDifferEndOffsets));
|
||||
assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferEndOffsets.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualsIfOnlyDifferInIdlingTime() {
|
||||
final TaskMetadataImpl stillSameDifferIdlingTime = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
TOPIC_PARTITIONS,
|
||||
COMMITTED_OFFSETS,
|
||||
END_OFFSETS,
|
||||
Optional.empty());
|
||||
assertThat(taskMetadata, equalTo(stillSameDifferIdlingTime));
|
||||
assertThat(taskMetadata.hashCode(), equalTo(stillSameDifferIdlingTime.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualsIfDifferInTaskID() {
|
||||
final TaskMetadataImpl differTaskId = new TaskMetadataImpl(
|
||||
new TaskId(1, 10000),
|
||||
TOPIC_PARTITIONS,
|
||||
COMMITTED_OFFSETS,
|
||||
END_OFFSETS,
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
assertThat(taskMetadata, not(equalTo(differTaskId)));
|
||||
assertThat(taskMetadata.hashCode(), not(equalTo(differTaskId.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualsIfDifferInTopicPartitions() {
|
||||
final TaskMetadataImpl differTopicPartitions = new TaskMetadataImpl(
|
||||
TASK_ID,
|
||||
mkSet(TP_0),
|
||||
COMMITTED_OFFSETS,
|
||||
END_OFFSETS,
|
||||
TIME_CURRENT_IDLING_STARTED);
|
||||
assertThat(taskMetadata, not(equalTo(differTopicPartitions)));
|
||||
assertThat(taskMetadata.hashCode(), not(equalTo(differTopicPartitions.hashCode())));
|
||||
}
|
||||
|
||||
private static boolean isUnmodifiable(final Collection<?> collection) {
|
||||
try {
|
||||
collection.clear();
|
||||
return false;
|
||||
} catch (final UnsupportedOperationException e) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isUnmodifiable(final Map<?, ?> collection) {
|
||||
try {
|
||||
collection.clear();
|
||||
return false;
|
||||
} catch (final UnsupportedOperationException e) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class ThreadMetadataImplTest {
|
||||
|
||||
public static final String THREAD_NAME = "thread name";
|
||||
public static final String THREAD_STATE = "thread state";
|
||||
public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer ClientID";
|
||||
public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer ClientID";
|
||||
public static final String CLIENT_ID_1 = "client Id 1";
|
||||
public static final String CLIENT_ID_2 = "client Id 2";
|
||||
public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, CLIENT_ID_2);
|
||||
public static final TaskId TASK_ID_0 = new TaskId(1, 2);
|
||||
public static final TaskId TASK_ID_1 = new TaskId(1, 1);
|
||||
public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
|
||||
public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
|
||||
public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
|
||||
public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
|
||||
public static final TaskMetadata TM_0 = new TaskMetadataImpl(
|
||||
TASK_ID_0,
|
||||
mkSet(TP_0_0, TP_1_0),
|
||||
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
|
||||
mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
|
||||
Optional.of(3L));
|
||||
public static final TaskMetadata TM_1 = new TaskMetadataImpl(
|
||||
TASK_ID_1,
|
||||
mkSet(TP_0_1, TP_1_1),
|
||||
mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
|
||||
mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
|
||||
Optional.of(3L));
|
||||
public static final Set<TaskMetadata> STANDBY_TASKS = mkSet(TM_0, TM_1);
|
||||
public static final Set<TaskMetadata> ACTIVE_TASKS = mkSet(TM_1);
|
||||
public static final String ADMIN_CLIENT_ID = "admin ClientID";
|
||||
|
||||
private ThreadMetadata threadMetadata;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
threadMetadata = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAllowModificationOfInternalStateViaGetters() {
|
||||
assertThat(isUnmodifiable(threadMetadata.producerClientIds()), is(true));
|
||||
assertThat(isUnmodifiable(threadMetadata.activeTasks()), is(true));
|
||||
assertThat(isUnmodifiable(threadMetadata.standbyTasks()), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualIfSameObject() {
|
||||
final ThreadMetadata same = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, equalTo(same));
|
||||
assertThat(threadMetadata.hashCode(), equalTo(same.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInThreadName() {
|
||||
final ThreadMetadata differThreadName = new ThreadMetadataImpl(
|
||||
"different",
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differThreadName)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differThreadName.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInThreadState() {
|
||||
final ThreadMetadata differThreadState = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
"different",
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differThreadState)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differThreadState.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInClientId() {
|
||||
final ThreadMetadata differMainConsumerClientId = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
"different",
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differMainConsumerClientId)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differMainConsumerClientId.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInConsumerClientId() {
|
||||
final ThreadMetadata differRestoreConsumerClientId = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
"different",
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differRestoreConsumerClientId)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differRestoreConsumerClientId.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInProducerClientIds() {
|
||||
final ThreadMetadata differProducerClientIds = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
mkSet(CLIENT_ID_1),
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differProducerClientIds)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differProducerClientIds.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInAdminClientId() {
|
||||
final ThreadMetadata differAdminClientId = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
"different",
|
||||
ACTIVE_TASKS,
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differAdminClientId)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differAdminClientId.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInActiveTasks() {
|
||||
final ThreadMetadata differActiveTasks = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
mkSet(TM_0),
|
||||
STANDBY_TASKS
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differActiveTasks)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differActiveTasks.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInStandByTasks() {
|
||||
final ThreadMetadata differStandByTasks = new ThreadMetadataImpl(
|
||||
THREAD_NAME,
|
||||
THREAD_STATE,
|
||||
MAIN_CONSUMER_CLIENT_ID,
|
||||
RESTORE_CONSUMER_CLIENT_ID,
|
||||
PRODUCER_CLIENT_IDS,
|
||||
ADMIN_CLIENT_ID,
|
||||
ACTIVE_TASKS,
|
||||
mkSet(TM_0)
|
||||
);
|
||||
assertThat(threadMetadata, not(equalTo(differStandByTasks)));
|
||||
assertThat(threadMetadata.hashCode(), not(equalTo(differStandByTasks.hashCode())));
|
||||
}
|
||||
|
||||
private static boolean isUnmodifiable(final Collection<?> collection) {
|
||||
try {
|
||||
collection.clear();
|
||||
return false;
|
||||
} catch (final UnsupportedOperationException e) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,39 +18,121 @@
|
|||
package org.apache.kafka.streams.state;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.StreamsMetadata;
|
||||
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class StreamsMetadataTest {
|
||||
|
||||
private static final HostInfo HOST_INFO = new HostInfo("local", 12);
|
||||
public static final Set<String> STATE_STORE_NAMES = mkSet("store1", "store2");
|
||||
private static final TopicPartition TP_0 = new TopicPartition("t", 0);
|
||||
private static final TopicPartition TP_1 = new TopicPartition("t", 1);
|
||||
public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
|
||||
public static final Set<String> STAND_BY_STORE_NAMES = mkSet("store2");
|
||||
public static final Set<TopicPartition> STANDBY_TOPIC_PARTITIONS = mkSet(TP_1);
|
||||
|
||||
private StreamsMetadata streamsMetadata;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
streamsMetadata = new StreamsMetadata(
|
||||
streamsMetadata = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
mkSet("store1", "store2"),
|
||||
mkSet(TP_0, TP_1),
|
||||
mkSet("store2"),
|
||||
mkSet(TP_1)
|
||||
STATE_STORE_NAMES,
|
||||
TOPIC_PARTITIONS,
|
||||
STAND_BY_STORE_NAMES,
|
||||
STANDBY_TOPIC_PARTITIONS
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotAllowModificationOfInternalStateViaGetters() {
|
||||
assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames()));
|
||||
assertTrue(isUnmodifiable(streamsMetadata.topicPartitions()));
|
||||
assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions()));
|
||||
assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
|
||||
assertThat(isUnmodifiable(streamsMetadata.stateStoreNames()), is(true));
|
||||
assertThat(isUnmodifiable(streamsMetadata.topicPartitions()), is(true));
|
||||
assertThat(isUnmodifiable(streamsMetadata.standbyTopicPartitions()), is(true));
|
||||
assertThat(isUnmodifiable(streamsMetadata.standbyStateStoreNames()), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBeEqualsIfSameObject() {
|
||||
final StreamsMetadata same = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
STATE_STORE_NAMES,
|
||||
TOPIC_PARTITIONS,
|
||||
STAND_BY_STORE_NAMES,
|
||||
STANDBY_TOPIC_PARTITIONS);
|
||||
assertThat(streamsMetadata, equalTo(same));
|
||||
assertThat(streamsMetadata.hashCode(), equalTo(same.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInHostInfo() {
|
||||
final StreamsMetadata differHostInfo = new StreamsMetadataImpl(
|
||||
new HostInfo("different", 122),
|
||||
STATE_STORE_NAMES,
|
||||
TOPIC_PARTITIONS,
|
||||
STAND_BY_STORE_NAMES,
|
||||
STANDBY_TOPIC_PARTITIONS);
|
||||
assertThat(streamsMetadata, not(equalTo(differHostInfo)));
|
||||
assertThat(streamsMetadata.hashCode(), not(equalTo(differHostInfo.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferStateStoreNames() {
|
||||
final StreamsMetadata differStateStoreNames = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
mkSet("store1"),
|
||||
TOPIC_PARTITIONS,
|
||||
STAND_BY_STORE_NAMES,
|
||||
STANDBY_TOPIC_PARTITIONS);
|
||||
assertThat(streamsMetadata, not(equalTo(differStateStoreNames)));
|
||||
assertThat(streamsMetadata.hashCode(), not(equalTo(differStateStoreNames.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInTopicPartitions() {
|
||||
final StreamsMetadata differTopicPartitions = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
STATE_STORE_NAMES,
|
||||
mkSet(TP_0),
|
||||
STAND_BY_STORE_NAMES,
|
||||
STANDBY_TOPIC_PARTITIONS);
|
||||
assertThat(streamsMetadata, not(equalTo(differTopicPartitions)));
|
||||
assertThat(streamsMetadata.hashCode(), not(equalTo(differTopicPartitions.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInStandByStores() {
|
||||
final StreamsMetadata differStandByStores = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
STATE_STORE_NAMES,
|
||||
TOPIC_PARTITIONS,
|
||||
mkSet("store1"),
|
||||
STANDBY_TOPIC_PARTITIONS);
|
||||
assertThat(streamsMetadata, not(equalTo(differStandByStores)));
|
||||
assertThat(streamsMetadata.hashCode(), not(equalTo(differStandByStores.hashCode())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotBeEqualIfDifferInStandByTopicPartitions() {
|
||||
final StreamsMetadata differStandByTopicPartitions = new StreamsMetadataImpl(
|
||||
HOST_INFO,
|
||||
STATE_STORE_NAMES,
|
||||
TOPIC_PARTITIONS,
|
||||
STAND_BY_STORE_NAMES,
|
||||
mkSet(TP_0));
|
||||
assertThat(streamsMetadata, not(equalTo(differStandByTopicPartitions)));
|
||||
assertThat(streamsMetadata.hashCode(), not(equalTo(differStandByTopicPartitions.hashCode())));
|
||||
}
|
||||
|
||||
private static boolean isUnmodifiable(final Collection<?> collection) {
|
||||
|
|
|
@ -26,13 +26,13 @@ import org.apache.kafka.common.utils.Utils;
|
|||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
|
||||
|
@ -137,7 +137,7 @@ public class StreamsStandByReplicaTest {
|
|||
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
|
||||
final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
|
||||
final Set<ThreadMetadata> threadMetadata = streams.metadataForLocalThreads();
|
||||
for (final ThreadMetadata threadMetadatum : threadMetadata) {
|
||||
System.out.println(
|
||||
"ACTIVE_TASKS:" + threadMetadatum.activeTasks().size()
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.apache.kafka.streams.KafkaStreams;
|
|||
import org.apache.kafka.streams.KafkaStreams.State;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
import org.apache.kafka.streams.kstream.ForeachAction;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -82,7 +82,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
streams.setStateListener((newState, oldState) -> {
|
||||
if (newState == State.RUNNING && oldState == State.REBALANCING) {
|
||||
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
|
||||
final Set<ThreadMetadata> allThreadMetadata = streams.metadataForLocalThreads();
|
||||
final StringBuilder taskReportBuilder = new StringBuilder();
|
||||
final List<String> activeTasks = new ArrayList<>();
|
||||
final List<String> standbyTasks = new ArrayList<>();
|
||||
|
@ -110,7 +110,7 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
|
|||
|
||||
Exit.addShutdownHook("streams-shutdown-hook", () -> {
|
||||
streams.close();
|
||||
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
|
||||
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
|
||||
System.out.flush();
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue