mirror of https://github.com/apache/kafka.git
KAFKA-18736: Add Streams group heartbeat request manager (1/N) (#18870)
This commit adds the Streams group heartbeat request manager to the async consumer. The Streams group heartbeat request manager is responsible to send heartbeat requests and to process their responses. This commit implements: - sending of full heartbeat request (independent of any state) - processing successful response Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
657154dfb8
commit
d6b6952d48
|
@ -0,0 +1,385 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
|
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
|
||||||
|
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
|
||||||
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
import org.apache.kafka.common.utils.Timer;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
||||||
|
|
||||||
|
static class HeartbeatState {
|
||||||
|
|
||||||
|
private final StreamsMembershipManager membershipManager;
|
||||||
|
private final int rebalanceTimeoutMs;
|
||||||
|
private final StreamsRebalanceData streamsRebalanceData;
|
||||||
|
|
||||||
|
public HeartbeatState(final StreamsRebalanceData streamsRebalanceData,
|
||||||
|
final StreamsMembershipManager membershipManager,
|
||||||
|
final int rebalanceTimeoutMs) {
|
||||||
|
this.membershipManager = membershipManager;
|
||||||
|
this.streamsRebalanceData = streamsRebalanceData;
|
||||||
|
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StreamsGroupHeartbeatRequestData buildRequestData() {
|
||||||
|
StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData();
|
||||||
|
data.setGroupId(membershipManager.groupId());
|
||||||
|
data.setMemberId(membershipManager.memberId());
|
||||||
|
data.setMemberEpoch(membershipManager.memberEpoch());
|
||||||
|
membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
|
||||||
|
StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology();
|
||||||
|
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
|
||||||
|
topology.setEpoch(streamsRebalanceData.topologyEpoch());
|
||||||
|
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
|
||||||
|
data.setTopology(topology);
|
||||||
|
data.setProcessId(streamsRebalanceData.processId().toString());
|
||||||
|
streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
|
||||||
|
data.setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint()
|
||||||
|
.setHost(userEndpoint.host())
|
||||||
|
.setPort(userEndpoint.port())
|
||||||
|
);
|
||||||
|
});
|
||||||
|
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
|
||||||
|
.map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
|
||||||
|
.setKey(entry.getKey())
|
||||||
|
.setValue(entry.getValue())
|
||||||
|
)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
data.setShutdownApplication(streamsRebalanceData.shutdownRequested());
|
||||||
|
StreamsRebalanceData.Assignment reconciledAssignment = streamsRebalanceData.reconciledAssignment();
|
||||||
|
data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks()));
|
||||||
|
data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks()));
|
||||||
|
data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks()));
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdCollection(final Set<StreamsRebalanceData.TaskId> tasks) {
|
||||||
|
return tasks.stream()
|
||||||
|
.collect(
|
||||||
|
Collectors.groupingBy(StreamsRebalanceData.TaskId::subtopologyId,
|
||||||
|
Collectors.mapping(StreamsRebalanceData.TaskId::partitionId, Collectors.toList()))
|
||||||
|
)
|
||||||
|
.entrySet()
|
||||||
|
.stream()
|
||||||
|
.map(entry -> {
|
||||||
|
StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds();
|
||||||
|
ids.setSubtopologyId(entry.getKey());
|
||||||
|
ids.setPartitions(entry.getValue());
|
||||||
|
return ids;
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<StreamsGroupHeartbeatRequestData.Subtopology> getTopologyFromStreams(final Map<String, StreamsRebalanceData.Subtopology> subtopologies) {
|
||||||
|
final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologiesForRequest = new ArrayList<>(subtopologies.size());
|
||||||
|
for (final Map.Entry<String, StreamsRebalanceData.Subtopology> subtopology : subtopologies.entrySet()) {
|
||||||
|
subtopologiesForRequest.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue()));
|
||||||
|
}
|
||||||
|
subtopologiesForRequest.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.Subtopology::subtopologyId));
|
||||||
|
return subtopologiesForRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamsGroupHeartbeatRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName,
|
||||||
|
final StreamsRebalanceData.Subtopology subtopology) {
|
||||||
|
final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData = new StreamsGroupHeartbeatRequestData.Subtopology();
|
||||||
|
subtopologyData.setSubtopologyId(subtopologyName);
|
||||||
|
ArrayList<String> sortedSourceTopics = new ArrayList<>(subtopology.sourceTopics());
|
||||||
|
Collections.sort(sortedSourceTopics);
|
||||||
|
subtopologyData.setSourceTopics(sortedSourceTopics);
|
||||||
|
ArrayList<String> sortedSinkTopics = new ArrayList<>(subtopology.repartitionSinkTopics());
|
||||||
|
Collections.sort(sortedSinkTopics);
|
||||||
|
subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
|
||||||
|
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
|
||||||
|
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
|
||||||
|
subtopologyData.setCopartitionGroups(
|
||||||
|
getCopartitionGroupsFromStreams(subtopology.copartitionGroups(), subtopologyData));
|
||||||
|
return subtopologyData;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<StreamsGroupHeartbeatRequestData.CopartitionGroup> getCopartitionGroupsFromStreams(final Collection<Set<String>> copartitionGroups,
|
||||||
|
final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData) {
|
||||||
|
final Map<String, Short> sourceTopicsMap =
|
||||||
|
IntStream.range(0, subtopologyData.sourceTopics().size())
|
||||||
|
.boxed()
|
||||||
|
.collect(Collectors.toMap(subtopologyData.sourceTopics()::get, Integer::shortValue));
|
||||||
|
|
||||||
|
final Map<String, Short> repartitionSourceTopics =
|
||||||
|
IntStream.range(0, subtopologyData.repartitionSourceTopics().size())
|
||||||
|
.boxed()
|
||||||
|
.collect(
|
||||||
|
Collectors.toMap(x -> subtopologyData.repartitionSourceTopics().get(x).name(),
|
||||||
|
Integer::shortValue));
|
||||||
|
|
||||||
|
return copartitionGroups.stream()
|
||||||
|
.map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap, repartitionSourceTopics))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamsGroupHeartbeatRequestData.CopartitionGroup getCopartitionGroupFromStreams(final Set<String> topicNames,
|
||||||
|
final Map<String, Short> sourceTopicsMap,
|
||||||
|
final Map<String, Short> repartitionSourceTopics) {
|
||||||
|
StreamsGroupHeartbeatRequestData.CopartitionGroup copartitionGroup = new StreamsGroupHeartbeatRequestData.CopartitionGroup();
|
||||||
|
|
||||||
|
topicNames.forEach(topicName -> {
|
||||||
|
if (sourceTopicsMap.containsKey(topicName)) {
|
||||||
|
copartitionGroup.sourceTopics().add(sourceTopicsMap.get(topicName));
|
||||||
|
} else if (repartitionSourceTopics.containsKey(topicName)) {
|
||||||
|
copartitionGroup.repartitionSourceTopics()
|
||||||
|
.add(repartitionSourceTopics.get(topicName));
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Source topic not found in subtopology: " + topicName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return copartitionGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getRepartitionTopicsInfoFromStreams(final StreamsRebalanceData.Subtopology subtopologyDataFromStreams) {
|
||||||
|
final List<StreamsGroupHeartbeatRequestData.TopicInfo> repartitionTopicsInfo = new ArrayList<>();
|
||||||
|
for (final Map.Entry<String, StreamsRebalanceData.TopicInfo> repartitionTopic : subtopologyDataFromStreams.repartitionSourceTopics().entrySet()) {
|
||||||
|
final StreamsGroupHeartbeatRequestData.TopicInfo repartitionTopicInfo = new StreamsGroupHeartbeatRequestData.TopicInfo();
|
||||||
|
repartitionTopicInfo.setName(repartitionTopic.getKey());
|
||||||
|
repartitionTopic.getValue().numPartitions().ifPresent(repartitionTopicInfo::setPartitions);
|
||||||
|
repartitionTopic.getValue().replicationFactor().ifPresent(repartitionTopicInfo::setReplicationFactor);
|
||||||
|
repartitionTopic.getValue().topicConfigs().forEach((k, v) ->
|
||||||
|
repartitionTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
|
||||||
|
);
|
||||||
|
repartitionTopicsInfo.add(repartitionTopicInfo);
|
||||||
|
}
|
||||||
|
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
|
||||||
|
return repartitionTopicsInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopicsInfoFromStreams(final StreamsRebalanceData.Subtopology subtopologyDataFromStreams) {
|
||||||
|
final List<StreamsGroupHeartbeatRequestData.TopicInfo> changelogTopicsInfo = new ArrayList<>();
|
||||||
|
for (final Map.Entry<String, StreamsRebalanceData.TopicInfo> changelogTopic : subtopologyDataFromStreams.stateChangelogTopics().entrySet()) {
|
||||||
|
final StreamsGroupHeartbeatRequestData.TopicInfo changelogTopicInfo = new StreamsGroupHeartbeatRequestData.TopicInfo();
|
||||||
|
changelogTopicInfo.setName(changelogTopic.getKey());
|
||||||
|
changelogTopic.getValue().replicationFactor().ifPresent(changelogTopicInfo::setReplicationFactor);
|
||||||
|
changelogTopic.getValue().topicConfigs().forEach((k, v) ->
|
||||||
|
changelogTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
|
||||||
|
);
|
||||||
|
changelogTopicsInfo.add(changelogTopicInfo);
|
||||||
|
}
|
||||||
|
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
|
||||||
|
return changelogTopicsInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The object extends
|
||||||
|
* {@link RequestState} to enable exponential backoff and duplicated request handling. The two fields that it holds are:
|
||||||
|
*/
|
||||||
|
static class HeartbeatRequestState extends RequestState {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The heartbeat timer tracks the time since the last heartbeat was sent
|
||||||
|
*/
|
||||||
|
private final Timer heartbeatTimer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The heartbeat interval which is acquired/updated through the heartbeat request
|
||||||
|
*/
|
||||||
|
private long heartbeatIntervalMs;
|
||||||
|
|
||||||
|
public HeartbeatRequestState(final LogContext logContext,
|
||||||
|
final Time time,
|
||||||
|
final long heartbeatIntervalMs,
|
||||||
|
final long retryBackoffMs,
|
||||||
|
final long retryBackoffMaxMs,
|
||||||
|
final double jitter) {
|
||||||
|
super(
|
||||||
|
logContext,
|
||||||
|
StreamsGroupHeartbeatRequestManager.HeartbeatRequestState.class.getName(),
|
||||||
|
retryBackoffMs,
|
||||||
|
2,
|
||||||
|
retryBackoffMaxMs,
|
||||||
|
jitter
|
||||||
|
);
|
||||||
|
this.heartbeatIntervalMs = heartbeatIntervalMs;
|
||||||
|
this.heartbeatTimer = time.timer(heartbeatIntervalMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void update(final long currentTimeMs) {
|
||||||
|
this.heartbeatTimer.update(currentTimeMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetTimer() {
|
||||||
|
this.heartbeatTimer.reset(heartbeatIntervalMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canSendRequest(final long currentTimeMs) {
|
||||||
|
update(currentTimeMs);
|
||||||
|
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
|
||||||
|
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
|
||||||
|
// no need to update the timer if the interval hasn't changed
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.heartbeatIntervalMs = heartbeatIntervalMs;
|
||||||
|
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Logger logger;
|
||||||
|
|
||||||
|
private final int maxPollIntervalMs;
|
||||||
|
|
||||||
|
private final CoordinatorRequestManager coordinatorRequestManager;
|
||||||
|
|
||||||
|
private final HeartbeatRequestState heartbeatRequestState;
|
||||||
|
|
||||||
|
private final HeartbeatState heartbeatState;
|
||||||
|
|
||||||
|
private final StreamsMembershipManager membershipManager;
|
||||||
|
|
||||||
|
private final HeartbeatMetricsManager metricsManager;
|
||||||
|
|
||||||
|
private StreamsRebalanceData streamsRebalanceData;
|
||||||
|
|
||||||
|
public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
|
||||||
|
final Time time,
|
||||||
|
final ConsumerConfig config,
|
||||||
|
final CoordinatorRequestManager coordinatorRequestManager,
|
||||||
|
final StreamsMembershipManager membershipManager,
|
||||||
|
final Metrics metrics,
|
||||||
|
final StreamsRebalanceData streamsRebalanceData) {
|
||||||
|
this.logger = logContext.logger(getClass());
|
||||||
|
this.coordinatorRequestManager = Objects.requireNonNull(
|
||||||
|
coordinatorRequestManager,
|
||||||
|
"Coordinator request manager cannot be null"
|
||||||
|
);
|
||||||
|
this.membershipManager = Objects.requireNonNull(
|
||||||
|
membershipManager,
|
||||||
|
"Streams membership manager cannot be null"
|
||||||
|
);
|
||||||
|
this.metricsManager = new HeartbeatMetricsManager(
|
||||||
|
Objects.requireNonNull(metrics, "Metrics cannot be null")
|
||||||
|
);
|
||||||
|
this.streamsRebalanceData = Objects.requireNonNull(streamsRebalanceData, "Streams rebalance data cannot be null");
|
||||||
|
this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||||
|
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||||
|
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
||||||
|
this.heartbeatState = new HeartbeatState(streamsRebalanceData, membershipManager, maxPollIntervalMs);
|
||||||
|
this.heartbeatRequestState = new HeartbeatRequestState(
|
||||||
|
logContext,
|
||||||
|
time,
|
||||||
|
0,
|
||||||
|
retryBackoffMs,
|
||||||
|
retryBackoffMaxMs,
|
||||||
|
maxPollIntervalMs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
|
||||||
|
return new NetworkClientDelegate.PollResult(
|
||||||
|
heartbeatRequestState.heartbeatIntervalMs,
|
||||||
|
Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs) {
|
||||||
|
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
|
||||||
|
new StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
|
||||||
|
coordinatorRequestManager.coordinator()
|
||||||
|
);
|
||||||
|
request.whenComplete((response, exception) -> {
|
||||||
|
long completionTimeMs = request.handler().completionTimeMs();
|
||||||
|
if (response != null) {
|
||||||
|
metricsManager.recordRequestLatency(response.requestLatencyMs());
|
||||||
|
onResponse((StreamsGroupHeartbeatResponse) response.responseBody(), completionTimeMs);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
heartbeatRequestState.onSendAttempt(currentTimeMs);
|
||||||
|
membershipManager.onHeartbeatRequestGenerated();
|
||||||
|
metricsManager.recordHeartbeatSentMs(currentTimeMs);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onResponse(final StreamsGroupHeartbeatResponse response, long currentTimeMs) {
|
||||||
|
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
|
||||||
|
onSuccessResponse(response, currentTimeMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) {
|
||||||
|
final StreamsGroupHeartbeatResponseData data = response.data();
|
||||||
|
|
||||||
|
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
|
||||||
|
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
|
||||||
|
heartbeatRequestState.resetTimer();
|
||||||
|
|
||||||
|
if (data.partitionsByUserEndpoint() != null) {
|
||||||
|
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<StreamsGroupHeartbeatResponseData.Status> statuses = data.status();
|
||||||
|
|
||||||
|
if (statuses != null && !statuses.isEmpty()) {
|
||||||
|
String statusDetails = statuses.stream()
|
||||||
|
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
|
||||||
|
.collect(Collectors.joining(", "));
|
||||||
|
logger.warn("Membership is in the following statuses: {}.", statusDetails);
|
||||||
|
}
|
||||||
|
|
||||||
|
membershipManager.onHeartbeatSuccess(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
|
||||||
|
Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> partitionsByHost = new HashMap<>();
|
||||||
|
data.partitionsByUserEndpoint().forEach(endpoint -> {
|
||||||
|
List<TopicPartition> topicPartitions = endpoint.partitions().stream()
|
||||||
|
.flatMap(partition ->
|
||||||
|
partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
StreamsGroupHeartbeatResponseData.Endpoint userEndpoint = endpoint.userEndpoint();
|
||||||
|
partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), topicPartitions);
|
||||||
|
});
|
||||||
|
return partitionsByHost;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,13 +16,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,6 +82,46 @@ public class StreamsRebalanceData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class HostInfo {
|
||||||
|
|
||||||
|
private final String host;
|
||||||
|
private final int port;
|
||||||
|
|
||||||
|
public HostInfo(final String host, final int port) {
|
||||||
|
this.host = Objects.requireNonNull(host);
|
||||||
|
this.port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String host() {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int port() {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
HostInfo hostInfo = (HostInfo) o;
|
||||||
|
return port == hostInfo.port && Objects.equals(host, hostInfo.host);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "HostInfo{" +
|
||||||
|
"host='" + host + '\'' +
|
||||||
|
", port=" + port +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static class Assignment {
|
public static class Assignment {
|
||||||
|
|
||||||
public static final Assignment EMPTY = new Assignment();
|
public static final Assignment EMPTY = new Assignment();
|
||||||
|
@ -244,18 +289,50 @@ public class StreamsRebalanceData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final UUID processId;
|
||||||
|
|
||||||
|
private final Optional<HostInfo> endpoint;
|
||||||
|
|
||||||
|
private final Map<String, String> clientTags;
|
||||||
|
|
||||||
private final Map<String, Subtopology> subtopologies;
|
private final Map<String, Subtopology> subtopologies;
|
||||||
|
|
||||||
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
|
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
|
||||||
|
|
||||||
public StreamsRebalanceData(Map<String, Subtopology> subtopologies) {
|
private final AtomicReference<Map<HostInfo, List<TopicPartition>>> partitionsByHost = new AtomicReference<>(Collections.emptyMap());
|
||||||
|
|
||||||
|
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public StreamsRebalanceData(final UUID processId,
|
||||||
|
final Optional<HostInfo> endpoint,
|
||||||
|
final Map<String, Subtopology> subtopologies,
|
||||||
|
final Map<String, String> clientTags) {
|
||||||
|
this.processId = Objects.requireNonNull(processId, "Process ID cannot be null");
|
||||||
|
this.endpoint = Objects.requireNonNull(endpoint, "Endpoint cannot be null");
|
||||||
this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null"));
|
this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null"));
|
||||||
|
this.clientTags = Map.copyOf(Objects.requireNonNull(clientTags, "Client tags cannot be null"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public UUID processId() {
|
||||||
|
return processId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<HostInfo> endpoint() {
|
||||||
|
return endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, String> clientTags() {
|
||||||
|
return clientTags;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Subtopology> subtopologies() {
|
public Map<String, Subtopology> subtopologies() {
|
||||||
return subtopologies;
|
return subtopologies;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int topologyEpoch() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
public void setReconciledAssignment(final Assignment assignment) {
|
public void setReconciledAssignment(final Assignment assignment) {
|
||||||
reconciledAssignment.set(assignment);
|
reconciledAssignment.set(assignment);
|
||||||
}
|
}
|
||||||
|
@ -263,4 +340,20 @@ public class StreamsRebalanceData {
|
||||||
public Assignment reconciledAssignment() {
|
public Assignment reconciledAssignment() {
|
||||||
return reconciledAssignment.get();
|
return reconciledAssignment.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setPartitionsByHost(final Map<HostInfo, List<TopicPartition>> partitionsByHost) {
|
||||||
|
this.partitionsByHost.set(partitionsByHost);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<HostInfo, List<TopicPartition>> partitionsByHost() {
|
||||||
|
return partitionsByHost.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void requestShutdown() {
|
||||||
|
shutdownRequested.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shutdownRequested() {
|
||||||
|
return shutdownRequested.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,313 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.ClientResponse;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.Node;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
|
import org.apache.kafka.common.requests.RequestHeader;
|
||||||
|
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
|
||||||
|
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class StreamsGroupHeartbeatRequestManagerTest {
|
||||||
|
|
||||||
|
private static final LogContext LOG_CONTEXT = new LogContext("test");
|
||||||
|
private static final int RECEIVED_HEARTBEAT_INTERVAL_MS = 1200;
|
||||||
|
private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
|
||||||
|
private static final String GROUP_ID = "group-id";
|
||||||
|
private static final String MEMBER_ID = "member-id";
|
||||||
|
private static final int MEMBER_EPOCH = 1;
|
||||||
|
private static final String INSTANCE_ID = "instance-id";
|
||||||
|
private static final UUID PROCESS_ID = UUID.randomUUID();
|
||||||
|
private static final StreamsRebalanceData.HostInfo ENDPOINT = new StreamsRebalanceData.HostInfo("localhost", 8080);
|
||||||
|
private static final String SOURCE_TOPIC_1 = "sourceTopic1";
|
||||||
|
private static final String SOURCE_TOPIC_2 = "sourceTopic2";
|
||||||
|
private static final Set<String> SOURCE_TOPICS = Set.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2);
|
||||||
|
private static final String REPARTITION_SINK_TOPIC_1 = "repartitionSinkTopic1";
|
||||||
|
private static final String REPARTITION_SINK_TOPIC_2 = "repartitionSinkTopic2";
|
||||||
|
private static final String REPARTITION_SINK_TOPIC_3 = "repartitionSinkTopic3";
|
||||||
|
private static final Set<String> REPARTITION_SINK_TOPICS = Set.of(
|
||||||
|
REPARTITION_SINK_TOPIC_1,
|
||||||
|
REPARTITION_SINK_TOPIC_2,
|
||||||
|
REPARTITION_SINK_TOPIC_3
|
||||||
|
);
|
||||||
|
private static final String REPARTITION_SOURCE_TOPIC_1 = "repartitionSourceTopic1";
|
||||||
|
private static final String REPARTITION_SOURCE_TOPIC_2 = "repartitionSourceTopic2";
|
||||||
|
private static final Map<String, StreamsRebalanceData.TopicInfo> REPARTITION_SOURCE_TOPICS = Map.of(
|
||||||
|
REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config1", "value1")),
|
||||||
|
REPARTITION_SOURCE_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap())
|
||||||
|
);
|
||||||
|
private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
|
||||||
|
private static final String CHANGELOG_TOPIC_2 = "changelogTopic2";
|
||||||
|
private static final String CHANGELOG_TOPIC_3 = "changelogTopic3";
|
||||||
|
private static final Map<String, StreamsRebalanceData.TopicInfo> CHANGELOG_TOPICS = Map.of(
|
||||||
|
CHANGELOG_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of()),
|
||||||
|
CHANGELOG_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2), Map.of()),
|
||||||
|
CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config2", "value2"))
|
||||||
|
);
|
||||||
|
private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(
|
||||||
|
Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2),
|
||||||
|
Set.of(SOURCE_TOPIC_2, REPARTITION_SOURCE_TOPIC_1)
|
||||||
|
);
|
||||||
|
private static final String SUBTOPOLOGY_NAME_1 = "subtopology1";
|
||||||
|
private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_1 = new StreamsRebalanceData.Subtopology(
|
||||||
|
SOURCE_TOPICS,
|
||||||
|
REPARTITION_SINK_TOPICS,
|
||||||
|
REPARTITION_SOURCE_TOPICS,
|
||||||
|
CHANGELOG_TOPICS,
|
||||||
|
COPARTITION_GROUP
|
||||||
|
);
|
||||||
|
private static final Map<String, StreamsRebalanceData.Subtopology> SUBTOPOLOGIES =
|
||||||
|
Map.of(SUBTOPOLOGY_NAME_1, SUBTOPOLOGY_1);
|
||||||
|
private static final Map<String, String> CLIENT_TAGS = Map.of("clientTag1", "value2");
|
||||||
|
private static final List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> ENDPOINT_TO_PARTITIONS =
|
||||||
|
List.of(
|
||||||
|
new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
|
||||||
|
.setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080))
|
||||||
|
.setPartitions(List.of(
|
||||||
|
new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0)))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
private final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
PROCESS_ID,
|
||||||
|
Optional.of(ENDPOINT),
|
||||||
|
SUBTOPOLOGIES,
|
||||||
|
CLIENT_TAGS
|
||||||
|
);
|
||||||
|
|
||||||
|
private final Time time = new MockTime();
|
||||||
|
|
||||||
|
private final ConsumerConfig config = config();
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CoordinatorRequestManager coordinatorRequestManager;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private StreamsMembershipManager membershipManager;
|
||||||
|
|
||||||
|
private final Metrics metrics = new Metrics(time);
|
||||||
|
|
||||||
|
private final Node coordinatorNode = new Node(1, "localhost", 9092);
|
||||||
|
|
||||||
|
private static ConsumerConfig config() {
|
||||||
|
Properties prop = new Properties();
|
||||||
|
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
|
||||||
|
return new ConsumerConfig(prop);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructWithNullCoordinatorRequestManager() {
|
||||||
|
final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(
|
||||||
|
new LogContext("test"),
|
||||||
|
time,
|
||||||
|
config,
|
||||||
|
null,
|
||||||
|
membershipManager,
|
||||||
|
metrics,
|
||||||
|
streamsRebalanceData
|
||||||
|
));
|
||||||
|
assertEquals("Coordinator request manager cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructWithNullMembershipManager() {
|
||||||
|
final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(
|
||||||
|
new LogContext("test"),
|
||||||
|
time,
|
||||||
|
config,
|
||||||
|
coordinatorRequestManager,
|
||||||
|
null,
|
||||||
|
metrics,
|
||||||
|
streamsRebalanceData
|
||||||
|
));
|
||||||
|
assertEquals("Streams membership manager cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructWithNullMetrics() {
|
||||||
|
final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(
|
||||||
|
new LogContext("test"),
|
||||||
|
time,
|
||||||
|
config,
|
||||||
|
coordinatorRequestManager,
|
||||||
|
membershipManager,
|
||||||
|
null,
|
||||||
|
streamsRebalanceData
|
||||||
|
));
|
||||||
|
assertEquals("Metrics cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructWithNullStreamsRebalanceData() {
|
||||||
|
final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(
|
||||||
|
new LogContext("test"),
|
||||||
|
time,
|
||||||
|
config,
|
||||||
|
coordinatorRequestManager,
|
||||||
|
membershipManager,
|
||||||
|
metrics,
|
||||||
|
null
|
||||||
|
));
|
||||||
|
assertEquals("Streams rebalance data cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendingFullHeartbeatRequest() {
|
||||||
|
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = new StreamsGroupHeartbeatRequestManager(
|
||||||
|
LOG_CONTEXT,
|
||||||
|
time,
|
||||||
|
config,
|
||||||
|
coordinatorRequestManager,
|
||||||
|
membershipManager,
|
||||||
|
metrics,
|
||||||
|
streamsRebalanceData
|
||||||
|
);
|
||||||
|
|
||||||
|
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||||
|
when(membershipManager.groupId()).thenReturn(GROUP_ID);
|
||||||
|
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
|
||||||
|
when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
|
||||||
|
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
|
||||||
|
|
||||||
|
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||||
|
|
||||||
|
assertEquals(0, result.timeUntilNextPollMs);
|
||||||
|
assertEquals(1, result.unsentRequests.size());
|
||||||
|
assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node());
|
||||||
|
NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||||
|
StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
|
||||||
|
assertEquals(GROUP_ID, streamsRequest.data().groupId());
|
||||||
|
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
|
||||||
|
assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
|
||||||
|
assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
|
||||||
|
assertEquals(PROCESS_ID.toString(), streamsRequest.data().processId());
|
||||||
|
assertEquals(ENDPOINT.host(), streamsRequest.data().userEndpoint().host());
|
||||||
|
assertEquals(ENDPOINT.port(), streamsRequest.data().userEndpoint().port());
|
||||||
|
assertEquals(1, streamsRequest.data().clientTags().size());
|
||||||
|
assertEquals("clientTag1", streamsRequest.data().clientTags().get(0).key());
|
||||||
|
assertEquals("value2", streamsRequest.data().clientTags().get(0).value());
|
||||||
|
assertEquals(streamsRebalanceData.topologyEpoch(), streamsRequest.data().topology().epoch());
|
||||||
|
assertNotNull(streamsRequest.data().topology());
|
||||||
|
final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies = streamsRequest.data().topology().subtopologies();
|
||||||
|
assertEquals(1, subtopologies.size());
|
||||||
|
final StreamsGroupHeartbeatRequestData.Subtopology subtopology = subtopologies.get(0);
|
||||||
|
assertEquals(SUBTOPOLOGY_NAME_1, subtopology.subtopologyId());
|
||||||
|
assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), subtopology.sourceTopics());
|
||||||
|
assertEquals(Arrays.asList("repartitionSinkTopic1", "repartitionSinkTopic2", "repartitionSinkTopic3"), subtopology.repartitionSinkTopics());
|
||||||
|
assertEquals(REPARTITION_SOURCE_TOPICS.size(), subtopology.repartitionSourceTopics().size());
|
||||||
|
subtopology.repartitionSourceTopics().forEach(topicInfo -> {
|
||||||
|
final StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
|
||||||
|
assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions());
|
||||||
|
assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor());
|
||||||
|
assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
|
||||||
|
});
|
||||||
|
assertEquals(CHANGELOG_TOPICS.size(), subtopology.stateChangelogTopics().size());
|
||||||
|
subtopology.stateChangelogTopics().forEach(topicInfo -> {
|
||||||
|
assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
|
||||||
|
assertEquals(0, topicInfo.partitions());
|
||||||
|
final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
|
||||||
|
assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor());
|
||||||
|
assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
|
||||||
|
});
|
||||||
|
assertEquals(2, subtopology.copartitionGroups().size());
|
||||||
|
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 =
|
||||||
|
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
|
||||||
|
.setRepartitionSourceTopics(Collections.singletonList((short) 0))
|
||||||
|
.setSourceTopics(Collections.singletonList((short) 1));
|
||||||
|
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 =
|
||||||
|
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
|
||||||
|
.setRepartitionSourceTopics(Collections.singletonList((short) 1))
|
||||||
|
.setSourceTopics(Collections.singletonList((short) 0));
|
||||||
|
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData1));
|
||||||
|
assertTrue(subtopology.copartitionGroups().contains(expectedCopartitionGroupData2));
|
||||||
|
verify(membershipManager).onHeartbeatRequestGenerated();
|
||||||
|
time.sleep(2000);
|
||||||
|
assertEquals(
|
||||||
|
2.0,
|
||||||
|
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue()
|
||||||
|
);
|
||||||
|
final ClientResponse response = buildClientResponse();
|
||||||
|
networkRequest.future().complete(response);
|
||||||
|
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody());
|
||||||
|
final List<TopicPartition> topicPartitions = streamsRebalanceData.partitionsByHost()
|
||||||
|
.get(new StreamsRebalanceData.HostInfo(
|
||||||
|
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
|
||||||
|
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
|
||||||
|
);
|
||||||
|
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic());
|
||||||
|
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition());
|
||||||
|
assertEquals(
|
||||||
|
1.0,
|
||||||
|
metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue()
|
||||||
|
);
|
||||||
|
NetworkClientDelegate.PollResult nextResult = heartbeatRequestManager.poll(time.milliseconds());
|
||||||
|
assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, nextResult.timeUntilNextPollMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientResponse buildClientResponse() {
|
||||||
|
return new ClientResponse(
|
||||||
|
new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1, "", 1),
|
||||||
|
null,
|
||||||
|
"-1",
|
||||||
|
time.milliseconds(),
|
||||||
|
time.milliseconds(),
|
||||||
|
false,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new StreamsGroupHeartbeatResponse(
|
||||||
|
new StreamsGroupHeartbeatResponseData()
|
||||||
|
.setPartitionsByUserEndpoint(ENDPOINT_TO_PARTITIONS)
|
||||||
|
.setHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -26,9 +25,11 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotSame;
|
import static org.junit.jupiter.api.Assertions.assertNotSame;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
@ -272,8 +273,17 @@ public class StreamsRebalanceDataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void streamsRebalanceDataShouldNotHaveModifiableSubtopologies() {
|
public void streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() {
|
||||||
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>());
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -285,21 +295,129 @@ public class StreamsRebalanceDataTest {
|
||||||
List.of()
|
List.of()
|
||||||
))
|
))
|
||||||
);
|
);
|
||||||
|
assertThrows(
|
||||||
|
UnsupportedOperationException.class,
|
||||||
|
() -> streamsRebalanceData.clientTags().put("clientTag1", "clientTagValue2")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void streamsRebalanceDataShouldNotAcceptNulls() {
|
public void streamsRebalanceDataShouldNotAcceptNullProcessId() {
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
|
||||||
final Exception exception = assertThrows(
|
final Exception exception = assertThrows(
|
||||||
NullPointerException.class,
|
NullPointerException.class,
|
||||||
() -> new StreamsRebalanceData(null)
|
() -> new StreamsRebalanceData(
|
||||||
|
null,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals("Process ID cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldNotAcceptNullHostInfo() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
|
||||||
|
final Exception exception = assertThrows(
|
||||||
|
NullPointerException.class,
|
||||||
|
() -> new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
null,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals("Endpoint cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldNotAcceptNullSubtopologies() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
|
||||||
|
final Exception exception = assertThrows(
|
||||||
|
NullPointerException.class,
|
||||||
|
() -> new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
null,
|
||||||
|
clientTags
|
||||||
|
)
|
||||||
);
|
);
|
||||||
assertEquals("Subtopologies cannot be null", exception.getMessage());
|
assertEquals("Subtopologies cannot be null", exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldNotAcceptNullClientTags() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
|
||||||
|
final Exception exception = assertThrows(
|
||||||
|
NullPointerException.class,
|
||||||
|
() -> new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertEquals("Client tags cannot be null", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void streamsRebalanceDataShouldBeConstructedWithEmptyAssignment() {
|
public void streamsRebalanceDataShouldBeConstructedWithEmptyAssignment() {
|
||||||
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>());
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
|
||||||
assertEquals(StreamsRebalanceData.Assignment.EMPTY, streamsRebalanceData.reconciledAssignment());
|
assertEquals(StreamsRebalanceData.Assignment.EMPTY, streamsRebalanceData.reconciledAssignment());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldBeConstructedWithEmptyPartitionsByHost() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
|
||||||
|
assertTrue(streamsRebalanceData.partitionsByHost().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldBeConstructedWithShutDownRequestedSetFalse() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
|
||||||
|
assertFalse(streamsRebalanceData.shutdownRequested());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,15 +22,18 @@ import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssigned
|
||||||
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
|
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
|
||||||
import org.apache.kafka.common.KafkaException;
|
import org.apache.kafka.common.KafkaException;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
@ -52,9 +55,24 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
@Mock
|
@Mock
|
||||||
private ApplicationEventHandler applicationEventHandler;
|
private ApplicationEventHandler applicationEventHandler;
|
||||||
|
|
||||||
|
private StreamsRebalanceData rebalanceData;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setup() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = new HashMap<>();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
rebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldInvokeOnTasksAssignedCallback() {
|
public void shouldInvokeOnTasksAssignedCallback() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
@ -90,7 +108,6 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReThrowErrorFromOnTasksAssignedCallbackAndPassErrorToBackground() {
|
public void shouldReThrowErrorFromOnTasksAssignedCallbackAndPassErrorToBackground() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
@ -130,7 +147,6 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldInvokeOnTasksRevokedCallback() {
|
public void shouldInvokeOnTasksRevokedCallback() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
@ -155,7 +171,6 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReThrowErrorFromOnTasksRevokedCallbackAndPassErrorToBackground() {
|
public void shouldReThrowErrorFromOnTasksRevokedCallbackAndPassErrorToBackground() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
@ -183,7 +198,6 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldInvokeOnAllTasksLostCallback() {
|
public void shouldInvokeOnAllTasksLostCallback() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
@ -223,7 +237,6 @@ public class StreamsRebalanceEventsProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldReThrowErrorFromOnAllTasksLostCallbackAndPassErrorToBackground() {
|
public void shouldReThrowErrorFromOnAllTasksLostCallbackAndPassErrorToBackground() {
|
||||||
final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap());
|
|
||||||
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
final StreamsRebalanceEventsProcessor rebalanceEventsProcessor =
|
||||||
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks);
|
||||||
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler);
|
||||||
|
|
Loading…
Reference in New Issue