From d6b6952d486894ddf1099c1ad42911bbaccb1533 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 18 Feb 2025 13:45:01 +0100 Subject: [PATCH] KAFKA-18736: Add Streams group heartbeat request manager (1/N) (#18870) This commit adds the Streams group heartbeat request manager to the async consumer. The Streams group heartbeat request manager is responsible to send heartbeat requests and to process their responses. This commit implements: - sending of full heartbeat request (independent of any state) - processing successful response Reviewers: Bill Bejeck , Lucas Brutschy --- .../StreamsGroupHeartbeatRequestManager.java | 385 ++++++++++++++++++ .../internals/StreamsRebalanceData.java | 95 ++++- ...reamsGroupHeartbeatRequestManagerTest.java | 313 ++++++++++++++ .../internals/StreamsRebalanceDataTest.java | 130 +++++- .../StreamsRebalanceEventsProcessorTest.java | 27 +- 5 files changed, 936 insertions(+), 14 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java new file mode 100644 index 00000000000..6d8b657af18 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class StreamsGroupHeartbeatRequestManager implements RequestManager { + + static class HeartbeatState { + + private final StreamsMembershipManager membershipManager; + private final int rebalanceTimeoutMs; + private final StreamsRebalanceData streamsRebalanceData; + + public HeartbeatState(final StreamsRebalanceData streamsRebalanceData, + final StreamsMembershipManager membershipManager, + final int rebalanceTimeoutMs) { + this.membershipManager = membershipManager; + this.streamsRebalanceData = streamsRebalanceData; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + } + + public StreamsGroupHeartbeatRequestData buildRequestData() { + StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData(); + data.setGroupId(membershipManager.groupId()); + data.setMemberId(membershipManager.memberId()); + data.setMemberEpoch(membershipManager.memberEpoch()); + membershipManager.groupInstanceId().ifPresent(data::setInstanceId); + StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology(); + topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies())); + topology.setEpoch(streamsRebalanceData.topologyEpoch()); + data.setRebalanceTimeoutMs(rebalanceTimeoutMs); + data.setTopology(topology); + data.setProcessId(streamsRebalanceData.processId().toString()); + streamsRebalanceData.endpoint().ifPresent(userEndpoint -> { + data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint() + .setHost(userEndpoint.host()) + .setPort(userEndpoint.port()) + ); + }); + data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream() + .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + ) + .collect(Collectors.toList())); + data.setShutdownApplication(streamsRebalanceData.shutdownRequested()); + StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment(); + data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks())); + data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks())); + data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks())); + return data; + } + + private static List convertTaskIdCollection(final Set tasks) { + return tasks.stream() + .collect( + Collectors.groupingBy(StreamsRebalanceData.TaskId::subtopologyId, + Collectors.mapping(StreamsRebalanceData.TaskId::partitionId, Collectors.toList())) + ) + .entrySet() + .stream() + .map(entry -> { + StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds(); + ids.setSubtopologyId(entry.getKey()); + ids.setPartitions(entry.getValue()); + return ids; + }) + .collect(Collectors.toList()); + } + + private static List getTopologyFromStreams(final Map subtopologies) { + final List subtopologiesForRequest = new ArrayList<>(subtopologies.size()); + for (final Map.Entry subtopology : subtopologies.entrySet()) { + subtopologiesForRequest.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue())); + } + subtopologiesForRequest.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.Subtopology::subtopologyId)); + return subtopologiesForRequest; + } + + private static StreamsGroupHeartbeatRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, + final StreamsRebalanceData.Subtopology subtopology) { + final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData = new StreamsGroupHeartbeatRequestData.Subtopology(); + subtopologyData.setSubtopologyId(subtopologyName); + ArrayList sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics()); + Collections.sort(sortedSourceTopics); + subtopologyData.setSourceTopics(sortedSourceTopics); + ArrayList sortedSinkTopics = new ArrayList<>(subtopology.repartitionSinkTopics()); + Collections.sort(sortedSinkTopics); + subtopologyData.setRepartitionSinkTopics(sortedSinkTopics); + subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology)); + subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology)); + subtopologyData.setCopartitionGroups( + getCopartitionGroupsFromStreams(subtopology.copartitionGroups(), subtopologyData)); + return subtopologyData; + } + + private static List getCopartitionGroupsFromStreams(final Collection> copartitionGroups, + final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData) { + final Map sourceTopicsMap = + IntStream.range(0, subtopologyData.sourceTopics().size()) + .boxed() + .collect(Collectors.toMap(subtopologyData.sourceTopics()::get, Integer::shortValue)); + + final Map repartitionSourceTopics = + IntStream.range(0, subtopologyData.repartitionSourceTopics().size()) + .boxed() + .collect( + Collectors.toMap(x -> subtopologyData.repartitionSourceTopics().get(x).name(), + Integer::shortValue)); + + return copartitionGroups.stream() + .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap, repartitionSourceTopics)) + .collect(Collectors.toList()); + } + + private static StreamsGroupHeartbeatRequestData.CopartitionGroup getCopartitionGroupFromStreams(final Set topicNames, + final Map sourceTopicsMap, + final Map repartitionSourceTopics) { + StreamsGroupHeartbeatRequestData.CopartitionGroup copartitionGroup = new StreamsGroupHeartbeatRequestData.CopartitionGroup(); + + topicNames.forEach(topicName -> { + if (sourceTopicsMap.containsKey(topicName)) { + copartitionGroup.sourceTopics().add(sourceTopicsMap.get(topicName)); + } else if (repartitionSourceTopics.containsKey(topicName)) { + copartitionGroup.repartitionSourceTopics() + .add(repartitionSourceTopics.get(topicName)); + } else { + throw new IllegalStateException( + "Source topic not found in subtopology: " + topicName); + } + }); + + return copartitionGroup; + } + + private static List getRepartitionTopicsInfoFromStreams(final StreamsRebalanceData.Subtopology subtopologyDataFromStreams) { + final List repartitionTopicsInfo = new ArrayList<>(); + for (final Map.Entry repartitionTopic : subtopologyDataFromStreams.repartitionSourceTopics().entrySet()) { + final StreamsGroupHeartbeatRequestData.TopicInfo repartitionTopicInfo = new StreamsGroupHeartbeatRequestData.TopicInfo(); + repartitionTopicInfo.setName(repartitionTopic.getKey()); + repartitionTopic.getValue().numPartitions().ifPresent(repartitionTopicInfo::setPartitions); + repartitionTopic.getValue().replicationFactor().ifPresent(repartitionTopicInfo::setReplicationFactor); + repartitionTopic.getValue().topicConfigs().forEach((k, v) -> + repartitionTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v)) + ); + repartitionTopicsInfo.add(repartitionTopicInfo); + } + repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name)); + return repartitionTopicsInfo; + } + + private static List getChangelogTopicsInfoFromStreams(final StreamsRebalanceData.Subtopology subtopologyDataFromStreams) { + final List changelogTopicsInfo = new ArrayList<>(); + for (final Map.Entry changelogTopic : subtopologyDataFromStreams.stateChangelogTopics().entrySet()) { + final StreamsGroupHeartbeatRequestData.TopicInfo changelogTopicInfo = new StreamsGroupHeartbeatRequestData.TopicInfo(); + changelogTopicInfo.setName(changelogTopic.getKey()); + changelogTopic.getValue().replicationFactor().ifPresent(changelogTopicInfo::setReplicationFactor); + changelogTopic.getValue().topicConfigs().forEach((k, v) -> + changelogTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v)) + ); + changelogTopicsInfo.add(changelogTopicInfo); + } + changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name)); + return changelogTopicsInfo; + } + } + + + /** + * Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The object extends + * {@link RequestState} to enable exponential backoff and duplicated request handling. The two fields that it holds are: + */ + static class HeartbeatRequestState extends RequestState { + + /** + * The heartbeat timer tracks the time since the last heartbeat was sent + */ + private final Timer heartbeatTimer; + + /** + * The heartbeat interval which is acquired/updated through the heartbeat request + */ + private long heartbeatIntervalMs; + + public HeartbeatRequestState(final LogContext logContext, + final Time time, + final long heartbeatIntervalMs, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final double jitter) { + super( + logContext, + StreamsGroupHeartbeatRequestManager.HeartbeatRequestState.class.getName(), + retryBackoffMs, + 2, + retryBackoffMaxMs, + jitter + ); + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.heartbeatTimer = time.timer(heartbeatIntervalMs); + } + + private void update(final long currentTimeMs) { + this.heartbeatTimer.update(currentTimeMs); + } + + public void resetTimer() { + this.heartbeatTimer.reset(heartbeatIntervalMs); + } + + @Override + public boolean canSendRequest(final long currentTimeMs) { + update(currentTimeMs); + return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); + } + + private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { + if (this.heartbeatIntervalMs == heartbeatIntervalMs) { + // no need to update the timer if the interval hasn't changed + return; + } + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); + } + } + + private final Logger logger; + + private final int maxPollIntervalMs; + + private final CoordinatorRequestManager coordinatorRequestManager; + + private final HeartbeatRequestState heartbeatRequestState; + + private final HeartbeatState heartbeatState; + + private final StreamsMembershipManager membershipManager; + + private final HeartbeatMetricsManager metricsManager; + + private StreamsRebalanceData streamsRebalanceData; + + public StreamsGroupHeartbeatRequestManager(final LogContext logContext, + final Time time, + final ConsumerConfig config, + final CoordinatorRequestManager coordinatorRequestManager, + final StreamsMembershipManager membershipManager, + final Metrics metrics, + final StreamsRebalanceData streamsRebalanceData) { + this.logger = logContext.logger(getClass()); + this.coordinatorRequestManager = Objects.requireNonNull( + coordinatorRequestManager, + "Coordinator request manager cannot be null" + ); + this.membershipManager = Objects.requireNonNull( + membershipManager, + "Streams membership manager cannot be null" + ); + this.metricsManager = new HeartbeatMetricsManager( + Objects.requireNonNull(metrics, "Metrics cannot be null") + ); + this.streamsRebalanceData = Objects.requireNonNull(streamsRebalanceData, "Streams rebalance data cannot be null"); + this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + this.heartbeatState = new HeartbeatState(streamsRebalanceData, membershipManager, maxPollIntervalMs); + this.heartbeatRequestState = new HeartbeatRequestState( + logContext, + time, + 0, + retryBackoffMs, + retryBackoffMaxMs, + maxPollIntervalMs + ); + } + + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + return new NetworkClientDelegate.PollResult( + heartbeatRequestState.heartbeatIntervalMs, + Collections.singletonList(makeHeartbeatRequest(currentTimeMs)) + ); + } + + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs) { + NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( + new StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), + coordinatorRequestManager.coordinator() + ); + request.whenComplete((response, exception) -> { + long completionTimeMs = request.handler().completionTimeMs(); + if (response != null) { + metricsManager.recordRequestLatency(response.requestLatencyMs()); + onResponse((StreamsGroupHeartbeatResponse) response.responseBody(), completionTimeMs); + } + }); + heartbeatRequestState.onSendAttempt(currentTimeMs); + membershipManager.onHeartbeatRequestGenerated(); + metricsManager.recordHeartbeatSentMs(currentTimeMs); + return request; + } + + private void onResponse(final StreamsGroupHeartbeatResponse response, long currentTimeMs) { + if (Errors.forCode(response.data().errorCode()) == Errors.NONE) { + onSuccessResponse(response, currentTimeMs); + } + } + + private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) { + final StreamsGroupHeartbeatResponseData data = response.data(); + + heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs()); + heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); + heartbeatRequestState.resetTimer(); + + if (data.partitionsByUserEndpoint() != null) { + streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data)); + } + + List statuses = data.status(); + + if (statuses != null && !statuses.isEmpty()) { + String statusDetails = statuses.stream() + .map(status -> "(" + status.statusCode() + ") " + status.statusDetail()) + .collect(Collectors.joining(", ")); + logger.warn("Membership is in the following statuses: {}.", statusDetails); + } + + membershipManager.onHeartbeatSuccess(response); + } + + private static Map> convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) { + Map> partitionsByHost = new HashMap<>(); + data.partitionsByUserEndpoint().forEach(endpoint -> { + List topicPartitions = endpoint.partitions().stream() + .flatMap(partition -> + partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId))) + .collect(Collectors.toList()); + StreamsGroupHeartbeatResponseData.Endpoint userEndpoint = endpoint.userEndpoint(); + partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), topicPartitions); + }); + return partitionsByHost; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java index a8670eeb1b1..43e804d84bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -16,13 +16,18 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.TopicPartition; + import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -77,6 +82,46 @@ public class StreamsRebalanceData { } } + public static class HostInfo { + + private final String host; + private final int port; + + public HostInfo(final String host, final int port) { + this.host = Objects.requireNonNull(host); + this.port = port; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + HostInfo hostInfo = (HostInfo) o; + return port == hostInfo.port && Objects.equals(host, hostInfo.host); + } + + @Override + public int hashCode() { + return Objects.hash(host, port); + } + + @Override + public String toString() { + return "HostInfo{" + + "host='" + host + '\'' + + ", port=" + port + + '}'; + } + + } + public static class Assignment { public static final Assignment EMPTY = new Assignment(); @@ -244,18 +289,50 @@ public class StreamsRebalanceData { } } + private final UUID processId; + + private final Optional endpoint; + + private final Map clientTags; + private final Map subtopologies; private final AtomicReference reconciledAssignment = new AtomicReference<>(Assignment.EMPTY); - public StreamsRebalanceData(Map subtopologies) { + private final AtomicReference>> partitionsByHost = new AtomicReference<>(Collections.emptyMap()); + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + + public StreamsRebalanceData(final UUID processId, + final Optional endpoint, + final Map subtopologies, + final Map clientTags) { + this.processId = Objects.requireNonNull(processId, "Process ID cannot be null"); + this.endpoint = Objects.requireNonNull(endpoint, "Endpoint cannot be null"); this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null")); + this.clientTags = Map.copyOf(Objects.requireNonNull(clientTags, "Client tags cannot be null")); + } + + public UUID processId() { + return processId; + } + + public Optional endpoint() { + return endpoint; + } + + public Map clientTags() { + return clientTags; } public Map subtopologies() { return subtopologies; } + public int topologyEpoch() { + return 0; + } + public void setReconciledAssignment(final Assignment assignment) { reconciledAssignment.set(assignment); } @@ -263,4 +340,20 @@ public class StreamsRebalanceData { public Assignment reconciledAssignment() { return reconciledAssignment.get(); } + + public void setPartitionsByHost(final Map> partitionsByHost) { + this.partitionsByHost.set(partitionsByHost); + } + + public Map> partitionsByHost() { + return partitionsByHost.get(); + } + + public void requestShutdown() { + shutdownRequested.set(true); + } + + public boolean shutdownRequested() { + return shutdownRequested.get(); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java new file mode 100644 index 00000000000..f0c0f6f207f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StreamsGroupHeartbeatRequestManagerTest { + + private static final LogContext LOG_CONTEXT = new LogContext("test"); + private static final int RECEIVED_HEARTBEAT_INTERVAL_MS = 1200; + private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; + private static final String GROUP_ID = "group-id"; + private static final String MEMBER_ID = "member-id"; + private static final int MEMBER_EPOCH = 1; + private static final String INSTANCE_ID = "instance-id"; + private static final UUID PROCESS_ID = UUID.randomUUID(); + private static final StreamsRebalanceData.HostInfo ENDPOINT = new StreamsRebalanceData.HostInfo("localhost", 8080); + private static final String SOURCE_TOPIC_1 = "sourceTopic1"; + private static final String SOURCE_TOPIC_2 = "sourceTopic2"; + private static final Set SOURCE_TOPICS = Set.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2); + private static final String REPARTITION_SINK_TOPIC_1 = "repartitionSinkTopic1"; + private static final String REPARTITION_SINK_TOPIC_2 = "repartitionSinkTopic2"; + private static final String REPARTITION_SINK_TOPIC_3 = "repartitionSinkTopic3"; + private static final Set REPARTITION_SINK_TOPICS = Set.of( + REPARTITION_SINK_TOPIC_1, + REPARTITION_SINK_TOPIC_2, + REPARTITION_SINK_TOPIC_3 + ); + private static final String REPARTITION_SOURCE_TOPIC_1 = "repartitionSourceTopic1"; + private static final String REPARTITION_SOURCE_TOPIC_2 = "repartitionSourceTopic2"; + private static final Map REPARTITION_SOURCE_TOPICS = Map.of( + REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config1", "value1")), + REPARTITION_SOURCE_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap()) + ); + private static final String CHANGELOG_TOPIC_1 = "changelogTopic1"; + private static final String CHANGELOG_TOPIC_2 = "changelogTopic2"; + private static final String CHANGELOG_TOPIC_3 = "changelogTopic3"; + private static final Map CHANGELOG_TOPICS = Map.of( + CHANGELOG_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of()), + CHANGELOG_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2), Map.of()), + CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config2", "value2")) + ); + private static final Collection> COPARTITION_GROUP = Set.of( + Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2), + Set.of(SOURCE_TOPIC_2, REPARTITION_SOURCE_TOPIC_1) + ); + private static final String SUBTOPOLOGY_NAME_1 = "subtopology1"; + private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_1 = new StreamsRebalanceData.Subtopology( + SOURCE_TOPICS, + REPARTITION_SINK_TOPICS, + REPARTITION_SOURCE_TOPICS, + CHANGELOG_TOPICS, + COPARTITION_GROUP + ); + private static final Map SUBTOPOLOGIES = + Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1); + private static final Map CLIENT_TAGS = Map.of("clientTag1", "value2"); + private static final List ENDPOINT_TO_PARTITIONS = + List.of( + new StreamsGroupHeartbeatResponseData.EndpointToPartitions() + .setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080)) + .setPartitions(List.of( + new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0))) + ) + ); + + private final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + PROCESS_ID, + Optional.of(ENDPOINT), + SUBTOPOLOGIES, + CLIENT_TAGS + ); + + private final Time time = new MockTime(); + + private final ConsumerConfig config = config(); + + @Mock + private CoordinatorRequestManager coordinatorRequestManager; + + @Mock + private StreamsMembershipManager membershipManager; + + private final Metrics metrics = new Metrics(time); + + private final Node coordinatorNode = new Node(1, "localhost", 9092); + + private static ConsumerConfig config() { + Properties prop = new Properties(); + prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS)); + return new ConsumerConfig(prop); + } + + @Test + public void testConstructWithNullCoordinatorRequestManager() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager( + new LogContext("test"), + time, + config, + null, + membershipManager, + metrics, + streamsRebalanceData + )); + assertEquals("Coordinator request manager cannot be null", exception.getMessage()); + } + + @Test + public void testConstructWithNullMembershipManager() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager( + new LogContext("test"), + time, + config, + coordinatorRequestManager, + null, + metrics, + streamsRebalanceData + )); + assertEquals("Streams membership manager cannot be null", exception.getMessage()); + } + + @Test + public void testConstructWithNullMetrics() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager( + new LogContext("test"), + time, + config, + coordinatorRequestManager, + membershipManager, + null, + streamsRebalanceData + )); + assertEquals("Metrics cannot be null", exception.getMessage()); + } + + @Test + public void testConstructWithNullStreamsRebalanceData() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager( + new LogContext("test"), + time, + config, + coordinatorRequestManager, + membershipManager, + metrics, + null + )); + assertEquals("Streams rebalance data cannot be null", exception.getMessage()); + } + + @Test + public void testSendingFullHeartbeatRequest() { + final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = new StreamsGroupHeartbeatRequestManager( + LOG_CONTEXT, + time, + config, + coordinatorRequestManager, + membershipManager, + metrics, + streamsRebalanceData + ); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.timeUntilNextPollMs); + assertEquals(1, result.unsentRequests.size()); + assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); + NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0); + StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build(); + assertEquals(GROUP_ID, streamsRequest.data().groupId()); + assertEquals(MEMBER_ID, streamsRequest.data().memberId()); + assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch()); + assertEquals(INSTANCE_ID, streamsRequest.data().instanceId()); + assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId()); + assertEquals(ENDPOINT.host(), streamsRequest.data().userEndpoint().host()); + assertEquals(ENDPOINT.port(), streamsRequest.data().userEndpoint().port()); + assertEquals(1, streamsRequest.data().clientTags().size()); + assertEquals("clientTag1", streamsRequest.data().clientTags().get(0).key()); + assertEquals("value2", streamsRequest.data().clientTags().get(0).value()); + assertEquals(streamsRebalanceData.topologyEpoch(), streamsRequest.data().topology().epoch()); + assertNotNull(streamsRequest.data().topology()); + final List subtopologies = streamsRequest.data().topology().subtopologies(); + assertEquals(1, subtopologies.size()); + final StreamsGroupHeartbeatRequestData.Subtopology subtopology = subtopologies.get(0); + assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId()); + assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), subtopology.sourceTopics()); + assertEquals(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2", "repartitionSinkTopic3"), subtopology.repartitionSinkTopics()); + assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology.repartitionSourceTopics().size()); + subtopology.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions()); + assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(CHANGELOG_TOPICS.size(), subtopology.stateChangelogTopics().size()); + subtopology.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name()); + assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor()); + assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size()); + }); + assertEquals(2, subtopology.copartitionGroups().size()); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + .setSourceTopics(Collections.singletonList((short) 1)); + final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = + new StreamsGroupHeartbeatRequestData.CopartitionGroup() + .setRepartitionSourceTopics(Collections.singletonList((short) 1)) + .setSourceTopics(Collections.singletonList((short) 0)); + assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1)); + assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2)); + verify(membershipManager).onHeartbeatRequestGenerated(); + time.sleep(2000); + assertEquals( + 2.0, + metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue() + ); + final ClientResponse response = buildClientResponse(); + networkRequest.future().complete(response); + verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody()); + final List topicPartitions = streamsRebalanceData.partitionsByHost() + .get(new StreamsRebalanceData.HostInfo( + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), + ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()) + ); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic()); + assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition()); + assertEquals( + 1.0, + metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue() + ); + NetworkClientDelegate.PollResult nextResult = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, nextResult.timeUntilNextPollMs); + } + + private ClientResponse buildClientResponse() { + return new ClientResponse( + new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1, "", 1), + null, + "-1", + time.milliseconds(), + time.milliseconds(), + false, + null, + null, + new StreamsGroupHeartbeatResponse( + new StreamsGroupHeartbeatResponseData() + .setPartitionsByUserEndpoint(ENDPOINT_TO_PARTITIONS) + .setHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS) + ) + ); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index 8a67c580f06..3caa298ee16 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; - import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -26,9 +25,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -272,8 +273,17 @@ public class StreamsRebalanceDataTest { } @Test - public void streamsRebalanceDataShouldNotHaveModifiableSubtopologies() { - final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>()); + public void streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); assertThrows( UnsupportedOperationException.class, @@ -285,21 +295,129 @@ public class StreamsRebalanceDataTest { List.of() )) ); + assertThrows( + UnsupportedOperationException.class, + () -> streamsRebalanceData.clientTags().put("clientTag1", "clientTagValue2") + ); } @Test - public void streamsRebalanceDataShouldNotAcceptNulls() { + public void streamsRebalanceDataShouldNotAcceptNullProcessId() { + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + final Exception exception = assertThrows( NullPointerException.class, - () -> new StreamsRebalanceData(null) + () -> new StreamsRebalanceData( + null, + endpoint, + subtopologies, + clientTags + ) + ); + assertEquals("Process ID cannot be null", exception.getMessage()); + } + + @Test + public void streamsRebalanceDataShouldNotAcceptNullHostInfo() { + final UUID processId = UUID.randomUUID(); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData( + processId, + null, + subtopologies, + clientTags + ) + ); + assertEquals("Endpoint cannot be null", exception.getMessage()); + } + + @Test + public void streamsRebalanceDataShouldNotAcceptNullSubtopologies() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData( + processId, + endpoint, + null, + clientTags + ) ); assertEquals("Subtopologies cannot be null", exception.getMessage()); } + @Test + public void streamsRebalanceDataShouldNotAcceptNullClientTags() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + null + ) + ); + assertEquals("Client tags cannot be null", exception.getMessage()); + } + @Test public void streamsRebalanceDataShouldBeConstructedWithEmptyAssignment() { - final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>()); + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); assertEquals(StreamsRebalanceData.Assignment.EMPTY, streamsRebalanceData.reconciledAssignment()); } + + @Test + public void streamsRebalanceDataShouldBeConstructedWithEmptyPartitionsByHost() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); + + assertTrue(streamsRebalanceData.partitionsByHost().isEmpty()); + } + + @Test + public void streamsRebalanceDataShouldBeConstructedWithShutDownRequestedSetFalse() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); + + assertFalse(streamsRebalanceData.shutdownRequested()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java index f30aa2718b1..6c7b164826b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java @@ -22,15 +22,18 @@ import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssigned import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent; import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -52,9 +55,24 @@ public class StreamsRebalanceEventsProcessorTest { @Mock private ApplicationEventHandler applicationEventHandler; + private StreamsRebalanceData rebalanceData; + + @BeforeEach + public void setup() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = new HashMap<>(); + final Map clientTags = Map.of("clientTag1", "clientTagValue1"); + rebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); + } + @Test public void shouldInvokeOnTasksAssignedCallback() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); @@ -90,7 +108,6 @@ public class StreamsRebalanceEventsProcessorTest { @Test public void shouldReThrowErrorFromOnTasksAssignedCallbackAndPassErrorToBackground() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); @@ -130,7 +147,6 @@ public class StreamsRebalanceEventsProcessorTest { @Test public void shouldInvokeOnTasksRevokedCallback() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); @@ -155,7 +171,6 @@ public class StreamsRebalanceEventsProcessorTest { @Test public void shouldReThrowErrorFromOnTasksRevokedCallbackAndPassErrorToBackground() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); @@ -183,7 +198,6 @@ public class StreamsRebalanceEventsProcessorTest { @Test public void shouldInvokeOnAllTasksLostCallback() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); @@ -223,7 +237,6 @@ public class StreamsRebalanceEventsProcessorTest { @Test public void shouldReThrowErrorFromOnAllTasksLostCallbackAndPassErrorToBackground() { - final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);