mirror of https://github.com/apache/kafka.git
KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState (#15920)
This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign. New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState. One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
parent
f0291ac74b
commit
0c5e8d3966
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.assignment;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
|
||||
import org.apache.kafka.streams.errors.TaskAssignmentException;
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -142,4 +142,18 @@ public class AssignmentConfigs {
|
|||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AssignmentConfigs{" +
|
||||
"\n acceptableRecoveryLag=" + acceptableRecoveryLag +
|
||||
"\n maxWarmupReplicas=" + maxWarmupReplicas +
|
||||
"\n numStandbyReplicas=" + numStandbyReplicas +
|
||||
"\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs +
|
||||
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
|
||||
"\n rackAwareTrafficCost=" + rackAwareTrafficCost +
|
||||
"\n rackAwareNonOverlapCost=" + rackAwareNonOverlapCost +
|
||||
"\n rackAwareAssignmentStrategy=" + rackAwareAssignmentStrategy +
|
||||
"\n}";
|
||||
}
|
||||
}
|
||||
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.processor.assignment;
|
||||
|
||||
import org.apache.kafka.common.protocol.types.Field.UUID;
|
||||
import java.util.UUID;
|
||||
|
||||
/** A simple wrapper around UUID that abstracts a Process ID */
|
||||
public class ProcessId {
|
||||
|
|
@ -34,4 +34,9 @@ public class ProcessId {
|
|||
public UUID id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProcessId{id=" + id + "}";
|
||||
}
|
||||
}
|
||||
|
|
@ -37,6 +37,8 @@ import org.apache.kafka.streams.errors.MissingSourceTopicException;
|
|||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.errors.TaskAssignmentException;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.assignment.ApplicationState;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
|
||||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
|
||||
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
|
||||
|
|
@ -124,7 +126,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
}
|
||||
}
|
||||
|
||||
private static class ClientMetadata {
|
||||
public static class ClientMetadata {
|
||||
|
||||
private final HostInfo hostInfo;
|
||||
private final ClientState state;
|
||||
|
|
@ -152,6 +154,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
state.addPreviousTasksAndOffsetSums(consumerId, taskOffsetSums);
|
||||
}
|
||||
|
||||
public ClientState state() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public HostInfo hostInfo() {
|
||||
return hostInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ClientMetadata{" +
|
||||
|
|
@ -431,7 +441,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
// ---------------- Step Four ---------------- //
|
||||
|
||||
// compute the assignment of tasks to threads within each client and build the final group assignment
|
||||
|
||||
final Map<String, Assignment> assignment = computeNewAssignment(
|
||||
statefulTasks,
|
||||
clientMetadataMap,
|
||||
|
|
@ -459,6 +468,31 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param clientMetadataMap the map of process id to client metadata used to build an immutable
|
||||
* {@code ApplicationState}
|
||||
* @param statefulTasks the set of {@code TaskId} that correspond to all the stateful
|
||||
* tasks that need to be reassigned.
|
||||
* @return The {@code ApplicationState} needed by the TaskAssigner to compute new task
|
||||
* assignments.
|
||||
*/
|
||||
private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> clientMetadataMap,
|
||||
final Set<TaskId> statefulTasks) {
|
||||
final Set<TaskId> statelessTasks = new HashSet<>();
|
||||
for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientMetadataMap.entrySet()) {
|
||||
final ClientState clientState = clientEntry.getValue().state;
|
||||
statelessTasks.addAll(clientState.statelessActiveTasks());
|
||||
}
|
||||
|
||||
return new ApplicationStateImpl(
|
||||
assignmentConfigs.toPublicAssignmentConfigs(),
|
||||
statefulTasks,
|
||||
statelessTasks,
|
||||
clientMetadataMap
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the subscription versions are within the expected bounds and check for version probing.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals.assignment;
|
||||
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.ClientMetadata;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.assignment.ApplicationState;
|
||||
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
|
||||
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
|
||||
import org.apache.kafka.streams.processor.assignment.ProcessId;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
|
||||
public class ApplicationStateImpl implements ApplicationState {
|
||||
|
||||
private final AssignmentConfigs assignmentConfigs;
|
||||
private final Set<TaskId> statelessTasks;
|
||||
private final Set<TaskId> statefulTasks;
|
||||
private final Set<TaskId> allTasks;
|
||||
private final Map<UUID, ClientMetadata> clientStates;
|
||||
|
||||
public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
|
||||
final Set<TaskId> statefulTasks,
|
||||
final Set<TaskId> statelessTasks,
|
||||
final Map<UUID, ClientMetadata> clientStates) {
|
||||
this.assignmentConfigs = assignmentConfigs;
|
||||
this.statefulTasks = unmodifiableSet(statefulTasks);
|
||||
this.statelessTasks = unmodifiableSet(statelessTasks);
|
||||
this.allTasks = unmodifiableSet(computeAllTasks(statelessTasks, statefulTasks));
|
||||
this.clientStates = clientStates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean computeTaskLags) {
|
||||
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = new HashMap<>();
|
||||
for (final Map.Entry<UUID, StreamsPartitionAssignor.ClientMetadata> clientEntry : clientStates.entrySet()) {
|
||||
final ClientMetadata metadata = clientEntry.getValue();
|
||||
final ClientState clientState = metadata.state();
|
||||
final ProcessId processId = new ProcessId(clientEntry.getKey());
|
||||
final Map<TaskId, Long> taskLagTotals = computeTaskLags ? clientState.taskLagTotals() : null;
|
||||
final KafkaStreamsState kafkaStreamsState = new KafkaStreamsStateImpl(
|
||||
processId,
|
||||
clientState.capacity(),
|
||||
clientState.clientTags(),
|
||||
clientState.previousActiveTasks(),
|
||||
clientState.previousStandbyTasks(),
|
||||
clientState.taskIdsByPreviousConsumer(),
|
||||
Optional.ofNullable(metadata.hostInfo()),
|
||||
Optional.ofNullable(taskLagTotals)
|
||||
);
|
||||
kafkaStreamsStates.put(processId, kafkaStreamsState);
|
||||
}
|
||||
|
||||
return kafkaStreamsStates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AssignmentConfigs assignmentConfigs() {
|
||||
return assignmentConfigs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TaskId> allTasks() {
|
||||
return allTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TaskId> statefulTasks() {
|
||||
return statefulTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TaskId> statelessTasks() {
|
||||
return statelessTasks;
|
||||
}
|
||||
|
||||
private static Set<TaskId> computeAllTasks(final Set<TaskId> statelessTasks, final Set<TaskId> statefulTasks) {
|
||||
final Set<TaskId> union = new HashSet<>(statefulTasks);
|
||||
union.addAll(statelessTasks);
|
||||
return union;
|
||||
}
|
||||
}
|
||||
|
|
@ -340,5 +340,18 @@ public final class AssignorConfiguration {
|
|||
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
|
||||
"\n}";
|
||||
}
|
||||
|
||||
public org.apache.kafka.streams.processor.assignment.AssignmentConfigs toPublicAssignmentConfigs() {
|
||||
return new org.apache.kafka.streams.processor.assignment.AssignmentConfigs(
|
||||
acceptableRecoveryLag,
|
||||
maxWarmupReplicas,
|
||||
numStandbyReplicas,
|
||||
probingRebalanceIntervalMs,
|
||||
rackAwareAssignmentTags,
|
||||
rackAwareAssignmentTrafficCost,
|
||||
rackAwareAssignmentNonOverlapCost,
|
||||
rackAwareAssignmentStrategy
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.processor.internals.assignment;
|
||||
|
||||
import java.util.SortedMap;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.Task;
|
||||
|
|
@ -456,6 +457,22 @@ public class ClientState {
|
|||
return consumerToPreviousStatefulTaskIds.keySet().toString();
|
||||
}
|
||||
|
||||
public Map<TaskId, Long> taskLagTotals() {
|
||||
return taskLagTotals;
|
||||
}
|
||||
|
||||
public SortedSet<TaskId> previousActiveTasks() {
|
||||
return new TreeSet<>(previousActiveTasks.taskIds());
|
||||
}
|
||||
|
||||
public SortedSet<TaskId> previousStandbyTasks() {
|
||||
return new TreeSet<>(previousStandbyTasks.taskIds());
|
||||
}
|
||||
|
||||
public SortedMap<String, Set<TaskId>> taskIdsByPreviousConsumer() {
|
||||
return new TreeMap<>(consumerToPreviousStatefulTaskIds);
|
||||
}
|
||||
|
||||
public String currentAssignment() {
|
||||
return "[activeTasks: (" + assignedActiveTasks.taskIds() +
|
||||
") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals.assignment;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static java.util.Collections.unmodifiableSortedMap;
|
||||
import static java.util.Collections.unmodifiableSortedSet;
|
||||
import static java.util.Comparator.comparingLong;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
|
||||
import org.apache.kafka.streams.processor.assignment.ProcessId;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KafkaStreamsStateImpl implements KafkaStreamsState {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsStateImpl.class);
|
||||
|
||||
private final ProcessId processId;
|
||||
private final int numProcessingThreads;
|
||||
private final Map<String, String> clientTags;
|
||||
private final SortedSet<TaskId> previousActiveTasks;
|
||||
private final SortedSet<TaskId> previousStandbyTasks;
|
||||
private final SortedMap<String, Set<TaskId>> taskIdsByConsumer;
|
||||
private final Optional<HostInfo> hostInfo;
|
||||
private final Optional<Map<TaskId, Long>> taskLagTotals; // contains lag for all stateful tasks in the app topology
|
||||
|
||||
public KafkaStreamsStateImpl(final ProcessId processId,
|
||||
final int numProcessingThreads,
|
||||
final Map<String, String> clientTags,
|
||||
final SortedSet<TaskId> previousActiveTasks,
|
||||
final SortedSet<TaskId> previousStandbyTasks,
|
||||
final SortedMap<String, Set<TaskId>> taskIdsByConsumer,
|
||||
final Optional<HostInfo> hostInfo,
|
||||
final Optional<Map<TaskId, Long>> taskLagTotals) {
|
||||
this.processId = processId;
|
||||
this.numProcessingThreads = numProcessingThreads;
|
||||
this.clientTags = unmodifiableMap(clientTags);
|
||||
this.previousActiveTasks = unmodifiableSortedSet(previousActiveTasks);
|
||||
this.previousStandbyTasks = unmodifiableSortedSet(previousStandbyTasks);
|
||||
this.taskIdsByConsumer = unmodifiableSortedMap(taskIdsByConsumer);
|
||||
this.hostInfo = hostInfo;
|
||||
this.taskLagTotals = taskLagTotals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessId processId() {
|
||||
return processId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numProcessingThreads() {
|
||||
return numProcessingThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<String> consumerClientIds() {
|
||||
return new TreeSet<>(taskIdsByConsumer.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<TaskId> previousActiveTasks() {
|
||||
return previousActiveTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<TaskId> previousStandbyTasks() {
|
||||
return previousStandbyTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long lagFor(final TaskId task) {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
LOG.error("lagFor was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
||||
final Long totalLag = taskLagTotals.get().get(task);
|
||||
if (totalLag == null) {
|
||||
LOG.error("Task lag lookup failed: {} not in {}", task,
|
||||
Arrays.toString(taskLagTotals.get().keySet().toArray()));
|
||||
throw new IllegalStateException("Tried to lookup lag for unknown task " + task);
|
||||
}
|
||||
return totalLag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<TaskId> prevTasksByLag(final String consumerClientId) {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
LOG.error("prevTasksByLag was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
||||
final SortedSet<TaskId> prevTasksByLag =
|
||||
new TreeSet<>(comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
|
||||
final Set<TaskId> prevOwnedStatefulTasks = taskIdsByConsumer.containsKey(consumerClientId)
|
||||
? taskIdsByConsumer.get(consumerClientId) : new HashSet<>();
|
||||
for (final TaskId task : prevOwnedStatefulTasks) {
|
||||
if (taskLagTotals.get().containsKey(task)) {
|
||||
prevTasksByLag.add(task);
|
||||
} else {
|
||||
LOG.debug(
|
||||
"Skipping previous task {} since it's not part of the current assignment",
|
||||
task
|
||||
);
|
||||
}
|
||||
}
|
||||
return prevTasksByLag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TaskId, Long> statefulTasksToLagSums() {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
LOG.error("statefulTasksToLagSums was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
||||
return taskLagTotals.get().keySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(taskId -> taskId, this::lagFor));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<HostInfo> hostInfo() {
|
||||
return hostInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> clientTags() {
|
||||
return clientTags;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals.assignment;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
|
||||
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_0;
|
||||
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_1;
|
||||
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_1_0;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
|
||||
import org.apache.kafka.streams.processor.assignment.ProcessId;
|
||||
import org.junit.Test;
|
||||
|
||||
public class KafkaStreamsStateTest {
|
||||
@Test
|
||||
public void shouldCorrectlyReturnTasksByLag() {
|
||||
final KafkaStreamsState state = new KafkaStreamsStateImpl(
|
||||
new ProcessId(UUID.randomUUID()),
|
||||
10,
|
||||
mkMap(),
|
||||
mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
|
||||
mkSortedSet(),
|
||||
new TreeMap<>(mkMap(
|
||||
mkEntry("c1", mkSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1))
|
||||
)),
|
||||
Optional.empty(),
|
||||
Optional.of(
|
||||
mkMap(
|
||||
mkEntry(NAMED_TASK_T0_0_0, 2000L),
|
||||
mkEntry(NAMED_TASK_T0_0_1, 1000L)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
assertThrows(IllegalStateException.class, () -> state.lagFor(NAMED_TASK_T0_1_0));
|
||||
assertThat(state.lagFor(NAMED_TASK_T0_0_0), equalTo(2000L));
|
||||
assertThat(state.lagFor(NAMED_TASK_T0_0_1), equalTo(1000L));
|
||||
|
||||
assertThat(state.prevTasksByLag("c0"), equalTo(new TreeSet<>()));
|
||||
assertThat(state.prevTasksByLag("c1"), equalTo(new TreeSet<>(
|
||||
Arrays.asList(NAMED_TASK_T0_0_1, NAMED_TASK_T0_0_0)
|
||||
)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowExceptionOnLagOperationsIfLagsWereNotComputed() {
|
||||
final KafkaStreamsState state = new KafkaStreamsStateImpl(
|
||||
new ProcessId(UUID.randomUUID()),
|
||||
10,
|
||||
mkMap(),
|
||||
mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1),
|
||||
mkSortedSet(),
|
||||
new TreeMap<>(mkMap(
|
||||
mkEntry("c1", mkSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1))
|
||||
)),
|
||||
Optional.empty(),
|
||||
Optional.empty()
|
||||
);
|
||||
|
||||
assertThrows(UnsupportedOperationException.class, () -> state.lagFor(NAMED_TASK_T0_0_0));
|
||||
assertThrows(UnsupportedOperationException.class, () -> state.prevTasksByLag("c1"));
|
||||
assertThrows(UnsupportedOperationException.class, state::statefulTasksToLagSums);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue