mirror of https://github.com/apache/kafka.git
KAFKA-17190: AssignmentsManager gets stuck retrying on deleted topics (#16672)
In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the controller informing it about which directory we have chosen to place each new replica on. Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in rapid succession, we can get stuck including the now-defunct replica in our subsequent AssignReplicasToDirsRequests forever. In order to prevent this problem, the AssignmentsManager should check if a topic still exists (and is still present on the broker in question) before sending the RPC. In order to prevent log spam, we should not log any error messages until several minutes have gone past without success. Finally, rather than creating a new EventQueue event for each assignment request, we should simply modify a shared data structure and schedule a deferred event to send the accumulated RPCs. This will improve efficiency. Reviewers: Igor Soarez <i@soarez.me>, Ron Dagostino <rndgstn@gmail.com>
This commit is contained in:
parent
98cdf97170
commit
e1b2adea07
|
|
@ -841,6 +841,7 @@ project(':server') {
|
|||
|
||||
dependencies {
|
||||
implementation project(':clients')
|
||||
implementation project(':metadata')
|
||||
implementation project(':server-common')
|
||||
implementation project(':storage')
|
||||
implementation project(':group-coordinator')
|
||||
|
|
|
|||
|
|
@ -80,6 +80,8 @@
|
|||
<allow pkg="org.apache.kafka.raft" />
|
||||
|
||||
<subpackage name="server">
|
||||
<allow pkg="org.apache.kafka.server" />
|
||||
<allow pkg="org.apache.kafka.image" />
|
||||
<subpackage name="metrics">
|
||||
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
|
||||
<allow pkg="org.apache.kafka.server.telemetry" />
|
||||
|
|
|
|||
|
|
@ -47,6 +47,10 @@ public class ExponentialBackoff {
|
|||
Math.log(maxInterval / (double) Math.max(initialInterval, 1)) / Math.log(multiplier) : 0;
|
||||
}
|
||||
|
||||
public long initialInterval() {
|
||||
return initialInterval;
|
||||
}
|
||||
|
||||
public long backoff(long attempts) {
|
||||
if (expMax == 0) {
|
||||
return initialInterval;
|
||||
|
|
|
|||
|
|
@ -303,9 +303,9 @@ class BrokerServer(
|
|||
time,
|
||||
assignmentsChannelManager,
|
||||
config.brokerId,
|
||||
() => lifecycleManager.brokerEpoch,
|
||||
(directoryId: Uuid) => logManager.directoryPath(directoryId).asJava,
|
||||
(topicId: Uuid) => Optional.ofNullable(metadataCache.topicIdsToNames().get(topicId))
|
||||
() => metadataCache.getImage(),
|
||||
(directoryId: Uuid) => logManager.directoryPath(directoryId).
|
||||
getOrElse("[unknown directory path]")
|
||||
)
|
||||
val directoryEventHandler = new DirectoryEventHandler {
|
||||
override def handleAssignment(partition: TopicIdPartition, directoryId: Uuid, reason: String, callback: Runnable): Unit =
|
||||
|
|
|
|||
|
|
@ -530,6 +530,10 @@ class KRaftMetadataCache(
|
|||
_currentImage = newImage
|
||||
}
|
||||
|
||||
def getImage(): MetadataImage = {
|
||||
_currentImage
|
||||
}
|
||||
|
||||
override def config(configResource: ConfigResource): Properties =
|
||||
_currentImage.configs().configProperties(configResource)
|
||||
|
||||
|
|
|
|||
|
|
@ -70,6 +70,14 @@ public final class ClusterImage {
|
|||
return brokers.containsKey(brokerId);
|
||||
}
|
||||
|
||||
public long brokerEpoch(int brokerId) {
|
||||
BrokerRegistration brokerRegistration = broker(brokerId);
|
||||
if (brokerRegistration == null) {
|
||||
return -1L;
|
||||
}
|
||||
return brokerRegistration.epoch();
|
||||
}
|
||||
|
||||
public void write(ImageWriter writer, ImageWriterOptions options) {
|
||||
for (BrokerRegistration broker : brokers.values()) {
|
||||
writer.write(broker.toRecord(options));
|
||||
|
|
|
|||
|
|
@ -317,4 +317,15 @@ public class ClusterImageTest {
|
|||
build());
|
||||
assertEquals("controller registration data", lossString.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrokerEpoch() {
|
||||
assertEquals(123L, IMAGE1.brokerEpoch(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrokerEpochForNonExistentBroker() {
|
||||
assertEquals(-1L, IMAGE1.brokerEpoch(20));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ public interface DirectoryEventHandler {
|
|||
* Handle the assignment of a topic partition to a directory.
|
||||
* @param directoryId The directory ID
|
||||
* @param partition The topic partition
|
||||
* @param reason The reason
|
||||
* @param callback Callback to apply when the request is completed.
|
||||
*/
|
||||
void handleAssignment(TopicIdPartition partition, Uuid directoryId, String reason, Runnable callback);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.TopicImage;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.metadata.Replicas;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
final class Assignment {
|
||||
/**
|
||||
* The topic ID and partition index of the replica.
|
||||
*/
|
||||
private final TopicIdPartition topicIdPartition;
|
||||
|
||||
/**
|
||||
* The ID of the directory we are placing the replica into.
|
||||
*/
|
||||
private final Uuid directoryId;
|
||||
|
||||
/**
|
||||
* The time in monotonic nanosecond when this assignment was created.
|
||||
*/
|
||||
private final long submissionTimeNs;
|
||||
|
||||
/**
|
||||
* The callback to invoke on success.
|
||||
*/
|
||||
private final Runnable successCallback;
|
||||
|
||||
Assignment(
|
||||
TopicIdPartition topicIdPartition,
|
||||
Uuid directoryId,
|
||||
long submissionTimeNs,
|
||||
Runnable successCallback
|
||||
) {
|
||||
this.topicIdPartition = topicIdPartition;
|
||||
this.directoryId = directoryId;
|
||||
this.submissionTimeNs = submissionTimeNs;
|
||||
this.successCallback = successCallback;
|
||||
}
|
||||
|
||||
TopicIdPartition topicIdPartition() {
|
||||
return topicIdPartition;
|
||||
}
|
||||
|
||||
Uuid directoryId() {
|
||||
return directoryId;
|
||||
}
|
||||
|
||||
long submissionTimeNs() {
|
||||
return submissionTimeNs;
|
||||
}
|
||||
|
||||
Runnable successCallback() {
|
||||
return successCallback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this Assignment is still valid to be sent.
|
||||
*
|
||||
* @param nodeId The broker ID.
|
||||
* @param image The metadata image.
|
||||
*
|
||||
* @return True only if the Assignment is still valid.
|
||||
*/
|
||||
boolean valid(int nodeId, MetadataImage image) {
|
||||
TopicImage topicImage = image.topics().getTopic(topicIdPartition.topicId());
|
||||
if (topicImage == null) {
|
||||
return false; // The topic has been deleted.
|
||||
}
|
||||
PartitionRegistration partition = topicImage.partitions().get(topicIdPartition.partitionId());
|
||||
if (partition == null) {
|
||||
return false; // The partition no longer exists.
|
||||
}
|
||||
// Check if this broker is still a replica.
|
||||
return Replicas.contains(partition.replicas, nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null || (!(o instanceof Assignment))) return false;
|
||||
Assignment other = (Assignment) o;
|
||||
return topicIdPartition.equals(other.topicIdPartition) &&
|
||||
directoryId.equals(other.directoryId) &&
|
||||
submissionTimeNs == other.submissionTimeNs &&
|
||||
successCallback.equals(other.successCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(topicIdPartition,
|
||||
directoryId,
|
||||
submissionTimeNs,
|
||||
successCallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append("Assignment");
|
||||
bld.append("(topicIdPartition=").append(topicIdPartition);
|
||||
bld.append(", directoryId=").append(directoryId);
|
||||
bld.append(", submissionTimeNs=").append(submissionTimeNs);
|
||||
bld.append(", successCallback=").append(successCallback);
|
||||
bld.append(")");
|
||||
return bld.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -30,442 +30,443 @@ import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
|
|||
import org.apache.kafka.common.utils.ExponentialBackoff;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.TopicImage;
|
||||
import org.apache.kafka.queue.EventQueue;
|
||||
import org.apache.kafka.queue.KafkaEventQueue;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
|
||||
|
||||
import com.yammer.metrics.core.Gauge;
|
||||
import com.yammer.metrics.core.MetricName;
|
||||
import com.yammer.metrics.core.MetricsRegistry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AssignmentsManager {
|
||||
import static org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class);
|
||||
public final class AssignmentsManager {
|
||||
static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(
|
||||
TimeUnit.MILLISECONDS.toNanos(100),
|
||||
2,
|
||||
TimeUnit.SECONDS.toNanos(10),
|
||||
0.02);
|
||||
|
||||
/**
|
||||
* Assignments are dispatched to the controller this long after
|
||||
* being submitted to {@link AssignmentsManager}, if there
|
||||
* is no request in flight already.
|
||||
* The interval is reset when a new assignment is submitted.
|
||||
* If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
|
||||
* is reached, we ignore this interval and dispatch immediately.
|
||||
* The minimum amount of time we will wait before logging individual assignment failures.
|
||||
*/
|
||||
private static final long DISPATCH_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(500);
|
||||
static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
|
||||
|
||||
private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10);
|
||||
/**
|
||||
* The metric reflecting the number of pending assignments.
|
||||
*/
|
||||
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
|
||||
metricName("QueuedReplicaToDirAssignments");
|
||||
|
||||
// visible for testing.
|
||||
static final String QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME = "QueuedReplicaToDirAssignments";
|
||||
/**
|
||||
* The event at which we send assignments, if appropriate.
|
||||
*/
|
||||
static final String MAYBE_SEND_ASSIGNMENTS_EVENT = "MaybeSendAssignmentsEvent";
|
||||
|
||||
/**
|
||||
* The log4j object.
|
||||
*/
|
||||
private final Logger log;
|
||||
|
||||
/**
|
||||
* The exponential backoff strategy to use.
|
||||
*/
|
||||
private final ExponentialBackoff backoff;
|
||||
|
||||
/**
|
||||
* The clock object to use.
|
||||
*/
|
||||
private final Time time;
|
||||
|
||||
/**
|
||||
* Used to send messages to the controller.
|
||||
*/
|
||||
private final NodeToControllerChannelManager channelManager;
|
||||
private final int brokerId;
|
||||
private final Supplier<Long> brokerEpochSupplier;
|
||||
|
||||
/**
|
||||
* The node ID.
|
||||
*/
|
||||
private final int nodeId;
|
||||
|
||||
/**
|
||||
* Supplies the latest MetadataImage.
|
||||
*/
|
||||
private final Supplier<MetadataImage> metadataImageSupplier;
|
||||
|
||||
/**
|
||||
* Maps directory IDs to descriptions for logging purposes.
|
||||
*/
|
||||
private final Function<Uuid, String> directoryIdToDescription;
|
||||
|
||||
/**
|
||||
* Maps partitions to assignments that are ready to send.
|
||||
*/
|
||||
private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
|
||||
|
||||
/**
|
||||
* Maps partitions to assignments that are in-flight. Older entries come first.
|
||||
*/
|
||||
private volatile Map<TopicIdPartition, Assignment> inflight;
|
||||
|
||||
/**
|
||||
* The registry to register our metrics with.
|
||||
*/
|
||||
private final MetricsRegistry metricsRegistry;
|
||||
|
||||
/**
|
||||
* The number of global failures we had previously (cleared after any success).
|
||||
*/
|
||||
private int previousGlobalFailures;
|
||||
|
||||
/**
|
||||
* The event queue.
|
||||
*/
|
||||
private final KafkaEventQueue eventQueue;
|
||||
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
|
||||
|
||||
// These variables should only be mutated from the KafkaEventQueue thread,
|
||||
// but `inflight` and `pending` are also read from a Yammer metrics gauge.
|
||||
private volatile Map<TopicIdPartition, AssignmentEvent> inflight = null;
|
||||
private volatile Map<TopicIdPartition, AssignmentEvent> pending = new HashMap<>();
|
||||
private final ExponentialBackoff resendExponentialBackoff =
|
||||
new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
|
||||
private final Function<Uuid, Optional<String>> dirIdToPath;
|
||||
private final Function<Uuid, Optional<String>> topicIdToName;
|
||||
private int failedAttempts = 0;
|
||||
static MetricName metricName(String name) {
|
||||
return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
|
||||
}
|
||||
|
||||
public AssignmentsManager(Time time,
|
||||
NodeToControllerChannelManager channelManager,
|
||||
int brokerId,
|
||||
Supplier<Long> brokerEpochSupplier,
|
||||
Function<Uuid, Optional<String>> dirIdToPath,
|
||||
Function<Uuid, Optional<String>> topicIdToName) {
|
||||
public AssignmentsManager(
|
||||
Time time,
|
||||
NodeToControllerChannelManager channelManager,
|
||||
int nodeId,
|
||||
Supplier<MetadataImage> metadataImageSupplier,
|
||||
Function<Uuid, String> directoryIdToDescription
|
||||
) {
|
||||
this(STANDARD_BACKOFF,
|
||||
time,
|
||||
channelManager,
|
||||
nodeId,
|
||||
metadataImageSupplier,
|
||||
directoryIdToDescription,
|
||||
KafkaYammerMetrics.defaultRegistry());
|
||||
}
|
||||
|
||||
AssignmentsManager(
|
||||
ExponentialBackoff backoff,
|
||||
Time time,
|
||||
NodeToControllerChannelManager channelManager,
|
||||
int nodeId,
|
||||
Supplier<MetadataImage> metadataImageSupplier,
|
||||
Function<Uuid, String> directoryIdToDescription,
|
||||
MetricsRegistry metricsRegistry
|
||||
) {
|
||||
this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").
|
||||
logger(AssignmentsManager.class);
|
||||
this.backoff = backoff;
|
||||
this.time = time;
|
||||
this.channelManager = channelManager;
|
||||
this.brokerId = brokerId;
|
||||
this.brokerEpochSupplier = brokerEpochSupplier;
|
||||
this.nodeId = nodeId;
|
||||
this.directoryIdToDescription = directoryIdToDescription;
|
||||
this.metadataImageSupplier = metadataImageSupplier;
|
||||
this.ready = new ConcurrentHashMap<>();
|
||||
this.inflight = Collections.emptyMap();
|
||||
this.metricsRegistry = metricsRegistry;
|
||||
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
|
||||
@Override
|
||||
public Integer value() {
|
||||
return numPending();
|
||||
}
|
||||
});
|
||||
this.previousGlobalFailures = 0;
|
||||
this.eventQueue = new KafkaEventQueue(time,
|
||||
new LogContext("[AssignmentsManager id=" + brokerId + "]"),
|
||||
"broker-" + brokerId + "-directory-assignments-manager-",
|
||||
new ShutdownEvent());
|
||||
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
|
||||
"broker-" + nodeId + "-directory-assignments-manager-",
|
||||
new ShutdownEvent());
|
||||
channelManager.start();
|
||||
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, () -> getMapSize(inflight) + getMapSize(pending));
|
||||
if (dirIdToPath == null) dirIdToPath = id -> Optional.empty();
|
||||
this.dirIdToPath = dirIdToPath;
|
||||
if (topicIdToName == null) topicIdToName = id -> Optional.empty();
|
||||
this.topicIdToName = topicIdToName;
|
||||
}
|
||||
|
||||
public int numPending() {
|
||||
return ready.size() + inflight.size();
|
||||
}
|
||||
|
||||
public void close() throws InterruptedException {
|
||||
try {
|
||||
eventQueue.close();
|
||||
} finally {
|
||||
metricsGroup.removeMetric(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
|
||||
}
|
||||
eventQueue.close();
|
||||
}
|
||||
|
||||
public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, String reason) {
|
||||
onAssignment(topicPartition, dirId, reason, null);
|
||||
public void onAssignment(
|
||||
TopicIdPartition topicIdPartition,
|
||||
Uuid directoryId,
|
||||
String reason,
|
||||
Runnable successCallback
|
||||
) {
|
||||
long nowNs = time.nanoseconds();
|
||||
Assignment assignment = new Assignment(
|
||||
topicIdPartition, directoryId, nowNs, successCallback);
|
||||
ready.put(topicIdPartition, assignment);
|
||||
if (log.isTraceEnabled()) {
|
||||
String topicDescription = Optional.ofNullable(metadataImageSupplier.get().topics().
|
||||
getTopic(assignment.topicIdPartition().topicId())).
|
||||
map(TopicImage::name).orElse(assignment.topicIdPartition().topicId().toString());
|
||||
log.trace("Registered assignment {}: {}, moving {}-{} into {}",
|
||||
assignment,
|
||||
reason,
|
||||
topicDescription,
|
||||
topicIdPartition.partitionId(),
|
||||
directoryIdToDescription.apply(assignment.directoryId()));
|
||||
}
|
||||
rescheduleMaybeSendAssignmentsEvent(nowNs);
|
||||
}
|
||||
|
||||
public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, String reason, Runnable callback) {
|
||||
if (callback == null) {
|
||||
callback = () -> { };
|
||||
}
|
||||
AssignmentEvent assignment = new AssignmentEvent(time.nanoseconds(), topicPartition, dirId, reason, callback);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Queued assignment {}", assignment);
|
||||
}
|
||||
eventQueue.append(assignment);
|
||||
}
|
||||
|
||||
// only for testing
|
||||
void wakeup() {
|
||||
eventQueue.wakeup();
|
||||
void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
|
||||
eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT,
|
||||
new AssignmentsManagerDeadlineFunction(backoff,
|
||||
nowNs, previousGlobalFailures, !inflight.isEmpty(), ready.size()),
|
||||
new MaybeSendAssignmentsEvent());
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for all the events handled by {@link AssignmentsManager}.
|
||||
* Handles shutdown.
|
||||
*/
|
||||
private abstract static class Event implements EventQueue.Event {
|
||||
/**
|
||||
* Override the default behavior in
|
||||
* {@link EventQueue.Event#handleException}
|
||||
* which swallows the exception.
|
||||
*/
|
||||
@Override
|
||||
public void handleException(Throwable e) {
|
||||
log.error("Unexpected error handling {}", this, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles shutdown of the {@link AssignmentsManager}.
|
||||
*/
|
||||
private class ShutdownEvent extends Event {
|
||||
private class ShutdownEvent implements EventQueue.Event {
|
||||
@Override
|
||||
public void run() {
|
||||
channelManager.shutdown();
|
||||
log.info("shutting down.");
|
||||
try {
|
||||
channelManager.shutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
|
||||
}
|
||||
try {
|
||||
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected exception removing metrics.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles new generated assignments, to be propagated to the controller.
|
||||
* Assignment events may be handled out of order, so for any two assignment
|
||||
* events for the same topic partition, the one with the oldest timestamp is
|
||||
* disregarded.
|
||||
* An event that processes the assignments in the ready map.
|
||||
*/
|
||||
private class AssignmentEvent extends Event {
|
||||
final long timestampNs;
|
||||
final TopicIdPartition partition;
|
||||
final Uuid dirId;
|
||||
final String reason;
|
||||
final List<Runnable> completionHandlers;
|
||||
AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid dirId, String reason, Runnable onComplete) {
|
||||
this.timestampNs = timestampNs;
|
||||
this.partition = Objects.requireNonNull(partition);
|
||||
this.dirId = Objects.requireNonNull(dirId);
|
||||
this.reason = reason;
|
||||
this.completionHandlers = new ArrayList<>();
|
||||
if (onComplete != null) {
|
||||
completionHandlers.add(onComplete);
|
||||
}
|
||||
}
|
||||
void merge(AssignmentEvent other) {
|
||||
if (!partition.equals(other.partition)) {
|
||||
throw new IllegalArgumentException("Cannot merge events for different partitions");
|
||||
}
|
||||
completionHandlers.addAll(other.completionHandlers);
|
||||
}
|
||||
void onComplete() {
|
||||
for (Runnable onComplete : completionHandlers) {
|
||||
onComplete.run();
|
||||
}
|
||||
}
|
||||
private class MaybeSendAssignmentsEvent implements EventQueue.Event {
|
||||
@Override
|
||||
public void run() {
|
||||
log.trace("Received assignment {}", this);
|
||||
AssignmentEvent existing = pending.getOrDefault(partition, null);
|
||||
boolean existingIsInFlight = false;
|
||||
if (existing == null && inflight != null) {
|
||||
existing = inflight.getOrDefault(partition, null);
|
||||
existingIsInFlight = true;
|
||||
try {
|
||||
maybeSendAssignments();
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected exception in MaybeSendAssignmentsEvent", e);
|
||||
}
|
||||
if (existing != null) {
|
||||
if (existing.dirId.equals(dirId)) {
|
||||
existing.merge(this);
|
||||
log.debug("Ignoring duplicate assignment {}", this);
|
||||
return;
|
||||
}
|
||||
if (existing.timestampNs > timestampNs) {
|
||||
existing.merge(this);
|
||||
log.debug("Dropping assignment {} because it's older than existing {}", this, existing);
|
||||
return;
|
||||
} else if (!existingIsInFlight) {
|
||||
this.merge(existing);
|
||||
log.debug("Dropping existing assignment {} because it's older than {}", existing, this);
|
||||
}
|
||||
}
|
||||
log.debug("Queueing new assignment {}", this);
|
||||
pending.put(partition, this);
|
||||
|
||||
if (inflight == null || inflight.isEmpty()) {
|
||||
scheduleDispatch();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
String partitionString = topicIdToName.apply(partition.topicId())
|
||||
.map(name -> name + ":" + partition.partitionId())
|
||||
.orElseGet(() -> "<topic name unknown id: " + partition.topicId() + " partition: " + partition.partitionId() + ">");
|
||||
String dirString = dirIdToPath.apply(dirId)
|
||||
.orElseGet(() -> "<dir path unknown id:" + dirId + ">");
|
||||
return "Assignment{" +
|
||||
"timestampNs=" + timestampNs +
|
||||
", partition=" + partitionString +
|
||||
", dir=" + dirString +
|
||||
", reason='" + reason + '\'' +
|
||||
'}';
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
AssignmentEvent that = (AssignmentEvent) o;
|
||||
return timestampNs == that.timestampNs
|
||||
&& Objects.equals(partition, that.partition)
|
||||
&& Objects.equals(dirId, that.dirId)
|
||||
&& Objects.equals(reason, that.reason);
|
||||
}
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(timestampNs, partition, dirId, reason);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gathers pending assignments and pushes them to the controller in a {@link AssignReplicasToDirsRequest}.
|
||||
* An event that handles the controller's response to our request.
|
||||
*/
|
||||
private class DispatchEvent extends Event {
|
||||
static final String TAG = "dispatch";
|
||||
@Override
|
||||
public void run() {
|
||||
if (inflight != null) {
|
||||
throw new IllegalStateException("Bug. Should not be dispatching while there are assignments in flight");
|
||||
}
|
||||
if (pending.isEmpty()) {
|
||||
log.trace("No pending assignments, no-op dispatch");
|
||||
return;
|
||||
}
|
||||
Collection<AssignmentEvent> events = pending.values();
|
||||
pending = new HashMap<>();
|
||||
inflight = new HashMap<>();
|
||||
for (AssignmentEvent event : events) {
|
||||
if (inflight.size() < AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
|
||||
inflight.put(event.partition, event);
|
||||
} else {
|
||||
pending.put(event.partition, event);
|
||||
}
|
||||
}
|
||||
if (!pending.isEmpty()) {
|
||||
log.warn("Too many assignments ({}) to fit in one call, sending only {} and queueing the rest",
|
||||
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
|
||||
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
|
||||
}
|
||||
Map<TopicIdPartition, Uuid> assignment = inflight.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().dirId));
|
||||
log.debug("Dispatching {} assignments: {}", assignment.size(), assignment);
|
||||
channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
|
||||
buildRequestData(brokerId, brokerEpochSupplier.get(), assignment)),
|
||||
new AssignReplicasToDirsRequestCompletionHandler());
|
||||
}
|
||||
}
|
||||
private class HandleResponseEvent implements EventQueue.Event {
|
||||
private final Map<TopicIdPartition, Assignment> sent;
|
||||
private final Optional<ClientResponse> response;
|
||||
|
||||
/**
|
||||
* Handles the response to a dispatched {@link AssignReplicasToDirsRequest}.
|
||||
*/
|
||||
private class AssignmentResponseEvent extends Event {
|
||||
private final ClientResponse response;
|
||||
public AssignmentResponseEvent(ClientResponse response) {
|
||||
HandleResponseEvent(
|
||||
Map<TopicIdPartition, Assignment> sent,
|
||||
Optional<ClientResponse> response
|
||||
) {
|
||||
this.sent = sent;
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (inflight == null) {
|
||||
throw new IllegalStateException("Bug. Cannot not be handling a client response if there is are no assignments in flight");
|
||||
}
|
||||
if (responseIsError(response)) {
|
||||
requeueAllAfterFailure();
|
||||
} else {
|
||||
failedAttempts = 0;
|
||||
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data();
|
||||
|
||||
Set<AssignmentEvent> failed = filterFailures(data, inflight);
|
||||
Set<AssignmentEvent> completed = Utils.diff(HashSet::new, new HashSet<>(inflight.values()), failed);
|
||||
for (AssignmentEvent assignmentEvent : completed) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Successfully propagated assignment {}", assignmentEvent);
|
||||
}
|
||||
assignmentEvent.onComplete();
|
||||
}
|
||||
|
||||
if (!failed.isEmpty()) {
|
||||
log.warn("Re-queueing assignments: {}", failed);
|
||||
for (AssignmentEvent event : failed) {
|
||||
pending.put(event.partition, event);
|
||||
}
|
||||
}
|
||||
inflight = null;
|
||||
if (!pending.isEmpty()) {
|
||||
scheduleDispatch();
|
||||
try {
|
||||
handleResponse(sent, response);
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected exception in HandleResponseEvent", e);
|
||||
} finally {
|
||||
if (!ready.isEmpty()) {
|
||||
rescheduleMaybeSendAssignmentsEvent(time.nanoseconds());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for a {@link AssignReplicasToDirsRequest}.
|
||||
* A callback object that handles the controller's response to our request.
|
||||
*/
|
||||
private class AssignReplicasToDirsRequestCompletionHandler implements ControllerRequestCompletionHandler {
|
||||
private class CompletionHandler implements ControllerRequestCompletionHandler {
|
||||
private final Map<TopicIdPartition, Assignment> sent;
|
||||
|
||||
CompletionHandler(Map<TopicIdPartition, Assignment> sent) {
|
||||
this.sent = sent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
log.warn("Request to controller timed out");
|
||||
appendResponseEvent(null);
|
||||
eventQueue.append(new HandleResponseEvent(sent, Optional.empty()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(ClientResponse response) {
|
||||
log.debug("Received controller response: {}", response);
|
||||
appendResponseEvent(response);
|
||||
}
|
||||
void appendResponseEvent(ClientResponse response) {
|
||||
eventQueue.prepend(new AssignmentResponseEvent(response));
|
||||
eventQueue.append(new HandleResponseEvent(sent, Optional.of(response)));
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleDispatch() {
|
||||
if (pending.size() < AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
|
||||
scheduleDispatch(DISPATCH_INTERVAL_NS);
|
||||
} else {
|
||||
log.debug("Too many pending assignments, dispatching immediately");
|
||||
eventQueue.enqueue(EventQueue.EventInsertionType.APPEND, DispatchEvent.TAG + "-immediate",
|
||||
new EventQueue.NoDeadlineFunction(), new DispatchEvent());
|
||||
void maybeSendAssignments() {
|
||||
int inflightSize = inflight.size();
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("maybeSendAssignments: inflightSize = {}.", inflightSize);
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleDispatch(long delayNs) {
|
||||
log.debug("Scheduling dispatch in {}ns", delayNs);
|
||||
eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, DispatchEvent.TAG,
|
||||
new EventQueue.LatestDeadlineFunction(time.nanoseconds() + delayNs), new DispatchEvent());
|
||||
}
|
||||
|
||||
private void requeueAllAfterFailure() {
|
||||
if (inflight != null) {
|
||||
log.debug("Re-queueing all in-flight assignments after failure");
|
||||
for (AssignmentEvent event : inflight.values()) {
|
||||
pending.put(event.partition, event);
|
||||
if (inflightSize > 0) {
|
||||
log.trace("maybeSendAssignments: cannot send new assignments because there are " +
|
||||
"{} still in flight.", inflightSize);
|
||||
return;
|
||||
}
|
||||
MetadataImage image = metadataImageSupplier.get();
|
||||
Map<TopicIdPartition, Assignment> newInFlight = new HashMap<>();
|
||||
int numInvalid = 0;
|
||||
for (Iterator<Assignment> iterator = ready.values().iterator();
|
||||
iterator.hasNext() && newInFlight.size() < MAX_ASSIGNMENTS_PER_REQUEST;
|
||||
) {
|
||||
Assignment assignment = iterator.next();
|
||||
iterator.remove();
|
||||
if (assignment.valid(nodeId, image)) {
|
||||
newInFlight.put(assignment.topicIdPartition(), assignment);
|
||||
} else {
|
||||
numInvalid++;
|
||||
}
|
||||
inflight = null;
|
||||
++failedAttempts;
|
||||
long backoffNs = TimeUnit.MILLISECONDS.toNanos(resendExponentialBackoff.backoff(failedAttempts));
|
||||
scheduleDispatch(DISPATCH_INTERVAL_NS + backoffNs);
|
||||
}
|
||||
log.info("maybeSendAssignments: sending {} assignments; invalidated {} assignments " +
|
||||
"prior to sending.", newInFlight.size(), numInvalid);
|
||||
if (!newInFlight.isEmpty()) {
|
||||
sendAssignments(image.cluster().brokerEpoch(nodeId), newInFlight);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean responseIsError(ClientResponse response) {
|
||||
if (response == null) {
|
||||
log.error("Response is null");
|
||||
return true;
|
||||
}
|
||||
if (response.authenticationException() != null) {
|
||||
log.error("Failed to propagate directory assignments because authentication failed", response.authenticationException());
|
||||
return true;
|
||||
}
|
||||
if (response.versionMismatch() != null) {
|
||||
log.error("Failed to propagate directory assignments because the request version is unsupported", response.versionMismatch());
|
||||
return true;
|
||||
}
|
||||
if (response.wasDisconnected()) {
|
||||
log.error("Failed to propagate directory assignments because the connection to the controller was disconnected");
|
||||
return true;
|
||||
}
|
||||
if (response.wasTimedOut()) {
|
||||
log.error("Failed to propagate directory assignments because the request timed out");
|
||||
return true;
|
||||
}
|
||||
if (response.responseBody() == null) {
|
||||
log.error("Failed to propagate directory assignments because the Controller returned an empty response");
|
||||
return true;
|
||||
}
|
||||
if (!(response.responseBody() instanceof AssignReplicasToDirsResponse)) {
|
||||
log.error("Failed to propagate directory assignments because the Controller returned an invalid response type");
|
||||
return true;
|
||||
}
|
||||
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data();
|
||||
Errors error = Errors.forCode(data.errorCode());
|
||||
if (error != Errors.NONE) {
|
||||
log.error("Failed to propagate directory assignments because the Controller returned error {}", error.name());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
void sendAssignments(long brokerEpoch, Map<TopicIdPartition, Assignment> newInflight) {
|
||||
CompletionHandler completionHandler = new CompletionHandler(newInflight);
|
||||
channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
|
||||
buildRequestData(nodeId, brokerEpoch, newInflight)),
|
||||
completionHandler);
|
||||
inflight = newInflight;
|
||||
}
|
||||
|
||||
private static Set<AssignmentEvent> filterFailures(
|
||||
AssignReplicasToDirsResponseData data,
|
||||
Map<TopicIdPartition, AssignmentEvent> sent) {
|
||||
Set<AssignmentEvent> failures = new HashSet<>();
|
||||
Set<TopicIdPartition> acknowledged = new HashSet<>();
|
||||
for (AssignReplicasToDirsResponseData.DirectoryData directory : data.directories()) {
|
||||
for (AssignReplicasToDirsResponseData.TopicData topic : directory.topics()) {
|
||||
for (AssignReplicasToDirsResponseData.PartitionData partition : topic.partitions()) {
|
||||
TopicIdPartition topicPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex());
|
||||
AssignmentEvent event = sent.get(topicPartition);
|
||||
if (event == null) {
|
||||
log.error("AssignReplicasToDirsResponse contains unexpected partition {} into directory {}", partition, directory.id());
|
||||
} else {
|
||||
acknowledged.add(topicPartition);
|
||||
Errors error = Errors.forCode(partition.errorCode());
|
||||
if (error == Errors.NOT_LEADER_OR_FOLLOWER) {
|
||||
log.info("Dropping late directory assignment for partition {} into directory {} because this broker is no longer a replica", partition, event.dirId);
|
||||
} else if (error != Errors.NONE) {
|
||||
log.error("Controller returned error {} for assignment of partition {} into directory {}",
|
||||
error.name(), partition, event.dirId);
|
||||
failures.add(event);
|
||||
}
|
||||
}
|
||||
void handleResponse(
|
||||
Map<TopicIdPartition, Assignment> sent,
|
||||
Optional<ClientResponse> assignmentResponse
|
||||
) {
|
||||
inflight = Collections.emptyMap();
|
||||
Optional<String> globalResponseError = globalResponseError(assignmentResponse);
|
||||
if (globalResponseError.isPresent()) {
|
||||
previousGlobalFailures++;
|
||||
log.error("handleResponse: {} assignments failed; global error: {}. Retrying.",
|
||||
sent.size(), globalResponseError.get());
|
||||
sent.entrySet().forEach(e -> ready.putIfAbsent(e.getKey(), e.getValue()));
|
||||
return;
|
||||
}
|
||||
previousGlobalFailures = 0;
|
||||
AssignReplicasToDirsResponseData responseData =
|
||||
((AssignReplicasToDirsResponse) assignmentResponse.get().responseBody()).data();
|
||||
long nowNs = time.nanoseconds();
|
||||
for (AssignReplicasToDirsResponseData.DirectoryData directoryData : responseData.directories()) {
|
||||
for (AssignReplicasToDirsResponseData.TopicData topicData : directoryData.topics()) {
|
||||
for (AssignReplicasToDirsResponseData.PartitionData partitionData : topicData.partitions()) {
|
||||
TopicIdPartition topicIdPartition =
|
||||
new TopicIdPartition(topicData.topicId(), partitionData.partitionIndex());
|
||||
handleAssignmentResponse(topicIdPartition, sent,
|
||||
Errors.forCode(partitionData.errorCode()), nowNs);
|
||||
sent.remove(topicIdPartition);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (AssignmentEvent event : sent.values()) {
|
||||
if (!acknowledged.contains(event.partition)) {
|
||||
log.error("AssignReplicasToDirsResponse is missing assignment of partition {} into directory {}", event.partition, event.dirId);
|
||||
failures.add(event);
|
||||
}
|
||||
for (Assignment assignment : sent.values()) {
|
||||
ready.putIfAbsent(assignment.topicIdPartition(), assignment);
|
||||
log.error("handleResponse: no result in response for partition {}.",
|
||||
assignment.topicIdPartition());
|
||||
}
|
||||
return failures;
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
|
||||
void handleAssignmentResponse(
|
||||
TopicIdPartition topicIdPartition,
|
||||
Map<TopicIdPartition, Assignment> sent,
|
||||
Errors error,
|
||||
long nowNs
|
||||
) {
|
||||
Assignment assignment = sent.get(topicIdPartition);
|
||||
if (assignment == null) {
|
||||
log.error("handleResponse: response contained topicIdPartition {}, but this was not " +
|
||||
"in the request.", topicIdPartition);
|
||||
} else if (error.equals(Errors.NONE)) {
|
||||
try {
|
||||
assignment.successCallback().run();
|
||||
} catch (Exception e) {
|
||||
log.error("handleResponse: unexpected callback exception", e);
|
||||
}
|
||||
} else {
|
||||
ready.putIfAbsent(topicIdPartition, assignment);
|
||||
if (log.isDebugEnabled() || nowNs > assignment.submissionTimeNs() + MIN_NOISY_FAILURE_INTERVAL_NS) {
|
||||
log.error("handleResponse: error assigning {}: {}.", assignment.topicIdPartition(), error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int previousGlobalFailures() throws ExecutionException, InterruptedException {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
eventQueue.append(() -> future.complete(previousGlobalFailures));
|
||||
return future.get();
|
||||
}
|
||||
|
||||
int numInFlight() {
|
||||
return inflight.size();
|
||||
}
|
||||
|
||||
static Optional<String> globalResponseError(Optional<ClientResponse> response) {
|
||||
if (!response.isPresent()) {
|
||||
return Optional.of("Timeout");
|
||||
}
|
||||
if (response.get().authenticationException() != null) {
|
||||
return Optional.of("AuthenticationException");
|
||||
}
|
||||
if (response.get().wasTimedOut()) {
|
||||
return Optional.of("Disonnected[Timeout]");
|
||||
}
|
||||
if (response.get().wasDisconnected()) {
|
||||
return Optional.of("Disconnected");
|
||||
}
|
||||
if (response.get().versionMismatch() != null) {
|
||||
return Optional.of("UnsupportedVersionException");
|
||||
}
|
||||
if (response.get().responseBody() == null) {
|
||||
return Optional.of("EmptyResponse");
|
||||
}
|
||||
if (!(response.get().responseBody() instanceof AssignReplicasToDirsResponse)) {
|
||||
return Optional.of("ClassCastException");
|
||||
}
|
||||
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse)
|
||||
response.get().responseBody()).data();
|
||||
Errors error = Errors.forCode(data.errorCode());
|
||||
if (error != Errors.NONE) {
|
||||
return Optional.of("Response-level error: " + error.name());
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
static AssignReplicasToDirsRequestData buildRequestData(
|
||||
int nodeId,
|
||||
long brokerEpoch,
|
||||
Map<TopicIdPartition, Assignment> assignments
|
||||
) {
|
||||
Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
|
||||
Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
|
||||
for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
|
||||
for (Map.Entry<TopicIdPartition, Assignment> entry : assignments.entrySet()) {
|
||||
TopicIdPartition topicPartition = entry.getKey();
|
||||
Uuid directoryId = entry.getValue();
|
||||
Uuid directoryId = entry.getValue().directoryId();
|
||||
DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new DirectoryData().setId(directoryId));
|
||||
TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
|
||||
.computeIfAbsent(topicPartition.topicId(), topicId -> {
|
||||
|
|
@ -477,12 +478,8 @@ public class AssignmentsManager {
|
|||
topic.partitions().add(partition);
|
||||
}
|
||||
return new AssignReplicasToDirsRequestData()
|
||||
.setBrokerId(brokerId)
|
||||
.setBrokerId(nodeId)
|
||||
.setBrokerEpoch(brokerEpoch)
|
||||
.setDirectories(new ArrayList<>(directoryMap.values()));
|
||||
}
|
||||
|
||||
private static int getMapSize(Map<TopicIdPartition, AssignmentEvent> map) {
|
||||
return map == null ? 0 : map.size();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.common.utils.ExponentialBackoff;
|
||||
|
||||
import java.util.OptionalLong;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
|
||||
|
||||
/**
|
||||
* This class calculates when the MaybeSendAssignmentsEvent should run for AssignmentsManager.
|
||||
*/
|
||||
public class AssignmentsManagerDeadlineFunction implements Function<OptionalLong, OptionalLong> {
|
||||
|
||||
/**
|
||||
* The exponential backoff to use.
|
||||
*/
|
||||
private final ExponentialBackoff backoff;
|
||||
|
||||
/**
|
||||
* The current time in monotonic nanoseconds.
|
||||
*/
|
||||
private final long nowNs;
|
||||
|
||||
/**
|
||||
* The number of global failures immediately prior to this attempt.
|
||||
*/
|
||||
private final int previousGlobalFailures;
|
||||
|
||||
/**
|
||||
* True if there are current inflight requests.
|
||||
*/
|
||||
private final boolean hasInflightRequests;
|
||||
|
||||
/**
|
||||
* The number of requests that are ready to send.
|
||||
*/
|
||||
private final int numReadyRequests;
|
||||
|
||||
AssignmentsManagerDeadlineFunction(
|
||||
ExponentialBackoff backoff,
|
||||
long nowNs,
|
||||
int previousGlobalFailures,
|
||||
boolean hasInflightRequests,
|
||||
int numReadyRequests
|
||||
) {
|
||||
this.backoff = backoff;
|
||||
this.nowNs = nowNs;
|
||||
this.previousGlobalFailures = previousGlobalFailures;
|
||||
this.hasInflightRequests = hasInflightRequests;
|
||||
this.numReadyRequests = numReadyRequests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong apply(OptionalLong previousSendTimeNs) {
|
||||
long delayNs;
|
||||
if (previousGlobalFailures > 0) {
|
||||
// If there were global failures (like a response timeout), we want to wait for the
|
||||
// full backoff period.
|
||||
delayNs = backoff.backoff(previousGlobalFailures);
|
||||
} else if ((numReadyRequests > MAX_ASSIGNMENTS_PER_REQUEST) && !hasInflightRequests) {
|
||||
// If there were no previous failures, and we have lots of requests, send it as soon
|
||||
// as possible.
|
||||
delayNs = 0;
|
||||
} else {
|
||||
// Otherwise, use the standard delay period. This helps to promote batching, which
|
||||
// reduces load on the controller.
|
||||
delayNs = backoff.initialInterval();
|
||||
}
|
||||
long newSendTimeNs = nowNs + delayNs;
|
||||
if (previousSendTimeNs.isPresent() && previousSendTimeNs.getAsLong() < newSendTimeNs) {
|
||||
// If the previous send time was before the new one we calculated, go with the
|
||||
// previous one.
|
||||
return previousSendTimeNs;
|
||||
}
|
||||
// Otherwise, return our new send time.
|
||||
return OptionalLong.of(newSendTimeNs);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server;
|
||||
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.metadata.FeatureLevelRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
import org.apache.kafka.common.metadata.TopicRecord;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class AssignmentTest {
|
||||
private static final MetadataImage TEST_IMAGE;
|
||||
|
||||
static {
|
||||
MetadataDelta delta = new MetadataDelta.Builder().
|
||||
setImage(MetadataImage.EMPTY).
|
||||
build();
|
||||
delta.replay(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()));
|
||||
delta.replay(new TopicRecord().
|
||||
setName("foo").
|
||||
setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(0).
|
||||
setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")).
|
||||
setReplicas(Arrays.asList(0, 1, 2)).
|
||||
setIsr(Arrays.asList(0, 1, 2)).
|
||||
setLeader(1));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(1).
|
||||
setTopicId(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg")).
|
||||
setReplicas(Arrays.asList(1, 2, 3)).
|
||||
setIsr(Arrays.asList(1, 2, 3)).
|
||||
setLeader(1));
|
||||
TEST_IMAGE = delta.apply(MetadataProvenance.EMPTY);
|
||||
}
|
||||
|
||||
static class NoOpRunnable implements Runnable {
|
||||
static final NoOpRunnable INSTANCE = new NoOpRunnable();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NoOpRunnable";
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidAssignment() {
|
||||
assertTrue(new Assignment(
|
||||
new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 0),
|
||||
Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
|
||||
0,
|
||||
NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentForNonExistentTopicIsNotValid() {
|
||||
assertFalse(new Assignment(
|
||||
new TopicIdPartition(Uuid.fromString("uuOi4qGPSsuM0QwnYINvOw"), 0),
|
||||
Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
|
||||
0,
|
||||
NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentForNonExistentPartitionIsNotValid() {
|
||||
assertFalse(new Assignment(
|
||||
new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 2),
|
||||
Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
|
||||
0,
|
||||
NoOpRunnable.INSTANCE).valid(0, TEST_IMAGE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentReplicaNotOnBrokerIsNotValid() {
|
||||
assertFalse(new Assignment(
|
||||
new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 0),
|
||||
Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
|
||||
0,
|
||||
NoOpRunnable.INSTANCE).valid(3, TEST_IMAGE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentToString() {
|
||||
assertEquals("Assignment(topicIdPartition=rTudty6ITOCcO_ldVyzZYg:1, " +
|
||||
"directoryId=rzRT8XZaSbKsP6j238zogg, " +
|
||||
"submissionTimeNs=123, " +
|
||||
"successCallback=NoOpRunnable)",
|
||||
new Assignment(new TopicIdPartition(Uuid.fromString("rTudty6ITOCcO_ldVyzZYg"), 1),
|
||||
Uuid.fromString("rzRT8XZaSbKsP6j238zogg"),
|
||||
123,
|
||||
NoOpRunnable.INSTANCE).toString());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.common.utils.ExponentialBackoff;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import static org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class AssignmentsManagerDeadlineFunctionTest {
|
||||
private static final ExponentialBackoff BACKOFF = new ExponentialBackoff(1000, 2, 8000, 0.0);
|
||||
|
||||
@Test
|
||||
public void applyAfterDispatchInterval() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval()),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false, 12).
|
||||
apply(OptionalLong.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void applyAfterDispatchIntervalWithExistingEarlierDeadline() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval() / 2),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false, 12).
|
||||
apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void applyBackoffInterval() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval() * 2),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false, 12).
|
||||
apply(OptionalLong.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void applyBackoffIntervalWithExistingEarlierDeadline() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval() / 2),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false, 12).
|
||||
apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleImmediatelyWhenOverloaded() {
|
||||
assertEquals(OptionalLong.of(0),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, false,
|
||||
MAX_ASSIGNMENTS_PER_REQUEST + 1).
|
||||
apply(OptionalLong.of(BACKOFF.initialInterval() / 2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doNotScheduleImmediatelyWhenOverloadedIfThereAreInFlightRequests() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval()),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 0, true,
|
||||
MAX_ASSIGNMENTS_PER_REQUEST + 1).
|
||||
apply(OptionalLong.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doNotScheduleImmediatelyWhenOverloadedIfThereArePreviousGlobalFailures() {
|
||||
assertEquals(OptionalLong.of(BACKOFF.initialInterval() * 2),
|
||||
new AssignmentsManagerDeadlineFunction(BACKOFF, 0, 1, false,
|
||||
MAX_ASSIGNMENTS_PER_REQUEST + 1).
|
||||
apply(OptionalLong.empty()));
|
||||
}
|
||||
}
|
||||
|
|
@ -14,451 +14,501 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.clients.ClientResponse;
|
||||
import org.apache.kafka.clients.NodeApiVersions;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.AuthenticationException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.ApiVersionsResponseData;
|
||||
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
|
||||
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
|
||||
import org.apache.kafka.common.metadata.FeatureLevelRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.TopicRecord;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
import org.apache.kafka.common.requests.ApiVersionsResponse;
|
||||
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
|
||||
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.metadata.AssignmentsHelper;
|
||||
import org.apache.kafka.common.utils.ExponentialBackoff;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.MetadataProvenance;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import com.yammer.metrics.core.Gauge;
|
||||
import com.yammer.metrics.core.Metric;
|
||||
import com.yammer.metrics.core.MetricName;
|
||||
import com.yammer.metrics.core.MetricsRegistry;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
|
||||
import static org.apache.kafka.server.AssignmentsManager.QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.atMostOnce;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
public class AssignmentsManagerTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AssignmentsManagerTest.class);
|
||||
private static final Uuid TOPIC_1 = Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
|
||||
private static final Uuid TOPIC_2 = Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
|
||||
private static final Uuid TOPIC_3 = Uuid.fromString("ZeAwvYt-Ro2suQudGUdbRg");
|
||||
private static final Uuid DIR_1 = Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
|
||||
private static final Uuid DIR_2 = Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
|
||||
private static final Uuid DIR_3 = Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
|
||||
|
||||
private MockTime time;
|
||||
private NodeToControllerChannelManager channelManager;
|
||||
private AssignmentsManager manager;
|
||||
private static final MetadataImage TEST_IMAGE;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
time = new MockTime();
|
||||
channelManager = mock(NodeToControllerChannelManager.class);
|
||||
Map<Uuid, String> topicNames = new HashMap<>();
|
||||
topicNames.put(TOPIC_1, "TOPIC_1");
|
||||
topicNames.put(TOPIC_2, "TOPIC_2");
|
||||
Map<Uuid, String> dirPaths = new HashMap<>();
|
||||
dirPaths.put(DIR_1, "DIR_1");
|
||||
dirPaths.put(DIR_2, "DIR_2");
|
||||
dirPaths.put(DIR_3, "DIR_3");
|
||||
manager = new AssignmentsManager(time, channelManager, 8, () -> 100L,
|
||||
id -> Optional.ofNullable(dirPaths.get(id)), id -> Optional.ofNullable(topicNames.get(id)));
|
||||
static {
|
||||
MetadataDelta delta = new MetadataDelta.Builder().
|
||||
setImage(MetadataImage.EMPTY).
|
||||
build();
|
||||
delta.replay(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()));
|
||||
delta.replay(new RegisterBrokerRecord().
|
||||
setBrokerId(0).
|
||||
setIncarnationId(Uuid.fromString("JJsH6zB0R7eKbr0Sy49ULw")).
|
||||
setBrokerEpoch(123));
|
||||
delta.replay(new RegisterBrokerRecord().
|
||||
setBrokerId(1).
|
||||
setIncarnationId(Uuid.fromString("DtnWclXyQ4qNDvL97JlnvQ")).
|
||||
setBrokerEpoch(456));
|
||||
delta.replay(new RegisterBrokerRecord().
|
||||
setBrokerId(2).
|
||||
setIncarnationId(Uuid.fromString("UFa_RKgLR4mxEXyquEPEmg")).
|
||||
setBrokerEpoch(789));
|
||||
delta.replay(new RegisterBrokerRecord().
|
||||
setBrokerId(3).
|
||||
setIncarnationId(Uuid.fromString("jj-cnHYASAmb_H9JR6nmtQ")).
|
||||
setBrokerEpoch(987));
|
||||
delta.replay(new TopicRecord().
|
||||
setName("foo").
|
||||
setTopicId(TOPIC_1));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(0).
|
||||
setTopicId(TOPIC_1).
|
||||
setReplicas(Arrays.asList(0, 1, 2)).
|
||||
setIsr(Arrays.asList(0, 1, 2)).
|
||||
setLeader(1));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(1).
|
||||
setTopicId(TOPIC_1).
|
||||
setReplicas(Arrays.asList(1, 2, 3)).
|
||||
setIsr(Arrays.asList(1, 2, 3)).
|
||||
setLeader(1));
|
||||
delta.replay(new TopicRecord().
|
||||
setName("bar").
|
||||
setTopicId(TOPIC_2));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(0).
|
||||
setTopicId(TOPIC_2).
|
||||
setReplicas(Arrays.asList(0, 3, 2)).
|
||||
setIsr(Arrays.asList(0, 3, 2)).
|
||||
setLeader(1));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(1).
|
||||
setTopicId(TOPIC_2).
|
||||
setReplicas(Arrays.asList(1, 2, 3)).
|
||||
setIsr(Arrays.asList(2)).
|
||||
setLeader(2));
|
||||
delta.replay(new PartitionRecord().
|
||||
setPartitionId(2).
|
||||
setTopicId(TOPIC_2).
|
||||
setReplicas(Arrays.asList(3, 2, 1)).
|
||||
setIsr(Arrays.asList(3, 2, 1)).
|
||||
setLeader(3));
|
||||
TEST_IMAGE = delta.apply(MetadataProvenance.EMPTY);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() throws InterruptedException {
|
||||
manager.close();
|
||||
}
|
||||
static class MockNodeToControllerChannelManager implements NodeToControllerChannelManager {
|
||||
LinkedBlockingDeque<Map.Entry<AssignReplicasToDirsRequestData, ControllerRequestCompletionHandler>> callbacks =
|
||||
new LinkedBlockingDeque<>();
|
||||
|
||||
AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData request) {
|
||||
request = request.duplicate();
|
||||
request.directories().sort(Comparator.comparing(
|
||||
AssignReplicasToDirsRequestData.DirectoryData::id));
|
||||
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
|
||||
directory.topics().sort(Comparator.comparing(
|
||||
AssignReplicasToDirsRequestData.TopicData::topicId));
|
||||
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
|
||||
topic.partitions().sort(Comparator.comparing(
|
||||
AssignReplicasToDirsRequestData.PartitionData::partitionIndex));
|
||||
@Override
|
||||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<NodeApiVersions> controllerApiVersions() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(
|
||||
AbstractRequest.Builder<? extends AbstractRequest> request,
|
||||
ControllerRequestCompletionHandler callback
|
||||
) {
|
||||
AssignReplicasToDirsRequest inputRequest = (AssignReplicasToDirsRequest) request.build();
|
||||
synchronized (this) {
|
||||
callbacks.add(new AbstractMap.SimpleEntry<>(inputRequest.data(), callback));
|
||||
}
|
||||
}
|
||||
|
||||
void completeCallback(Function<AssignReplicasToDirsRequestData, Optional<ClientResponse>> completionist) throws InterruptedException {
|
||||
Map.Entry<AssignReplicasToDirsRequestData, ControllerRequestCompletionHandler> entry = callbacks.take();
|
||||
Optional<ClientResponse> clientResponse = completionist.apply(entry.getKey());
|
||||
if (clientResponse.isPresent()) {
|
||||
entry.getValue().onComplete(clientResponse.get());
|
||||
} else {
|
||||
entry.getValue().onTimeout();
|
||||
}
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
static class TestEnv implements AutoCloseable {
|
||||
final ExponentialBackoff backoff;
|
||||
final MockNodeToControllerChannelManager channelManager;
|
||||
final MetricsRegistry metricsRegistry = new MetricsRegistry();
|
||||
final AssignmentsManager assignmentsManager;
|
||||
final Map<TopicIdPartition, Integer> successes;
|
||||
|
||||
void assertRequestEquals(
|
||||
AssignReplicasToDirsRequestData expected,
|
||||
AssignReplicasToDirsRequestData actual
|
||||
) {
|
||||
assertEquals(normalize(expected), normalize(actual));
|
||||
TestEnv() {
|
||||
this.backoff = new ExponentialBackoff(1, 2, 4, 0);
|
||||
this.channelManager = new MockNodeToControllerChannelManager();
|
||||
this.assignmentsManager = new AssignmentsManager(
|
||||
backoff, Time.SYSTEM, channelManager, 1, () -> TEST_IMAGE,
|
||||
t -> t.toString(), metricsRegistry);
|
||||
this.successes = new HashMap<>();
|
||||
}
|
||||
|
||||
void onAssignment(TopicIdPartition topicIdPartition, Uuid directoryId) {
|
||||
assignmentsManager.onAssignment(topicIdPartition, directoryId, "test", () -> {
|
||||
synchronized (successes) {
|
||||
successes.put(topicIdPartition, successes.getOrDefault(topicIdPartition, 0) + 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
int success(TopicIdPartition topicIdPartition) {
|
||||
synchronized (successes) {
|
||||
return successes.getOrDefault(topicIdPartition, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1() throws Exception {
|
||||
channelManager.completeCallback(req -> {
|
||||
AssignReplicasToDirsRequestData.DirectoryData directoryData = req.directories().get(0);
|
||||
assertEquals(DIR_1, directoryData.id());
|
||||
AssignReplicasToDirsRequestData.TopicData topicData = directoryData.topics().get(0);
|
||||
assertEquals(TOPIC_1, topicData.topicId());
|
||||
assertEquals(0, topicData.partitions().get(0).partitionIndex());
|
||||
return mockClientResponse(new AssignReplicasToDirsResponseData().
|
||||
setDirectories(Arrays.asList(new AssignReplicasToDirsResponseData.DirectoryData().
|
||||
setId(DIR_1).
|
||||
setTopics(Arrays.asList(new AssignReplicasToDirsResponseData.TopicData().
|
||||
setTopicId(TOPIC_1).
|
||||
setPartitions(Arrays.asList(new AssignReplicasToDirsResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setErrorCode((short) 0))))))));
|
||||
});
|
||||
}
|
||||
|
||||
Metric findMetric(MetricName name) {
|
||||
for (Map.Entry<MetricName, Metric> entry : metricsRegistry.allMetrics().entrySet()) {
|
||||
if (name.equals(entry.getKey())) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("metric named " + name + " not found");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // do not warn about Gauge typecast.
|
||||
int queuedReplicaToDirAssignments() {
|
||||
Gauge<Integer> queuedReplicaToDirAssignments =
|
||||
(Gauge<Integer>) findMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
|
||||
return queuedReplicaToDirAssignments.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
try {
|
||||
assignmentsManager.close();
|
||||
} catch (Exception e) {
|
||||
LOG.error("error shutting down assignmentsManager", e);
|
||||
}
|
||||
try {
|
||||
metricsRegistry.shutdown();
|
||||
} catch (Exception e) {
|
||||
LOG.error("error shutting down metricsRegistry", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Optional<ClientResponse> mockClientResponse(AssignReplicasToDirsResponseData data) {
|
||||
return Optional.of(new ClientResponse(null, null, "", 0, 0, false,
|
||||
null, null, new AssignReplicasToDirsResponse(data)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartAndShutdown() throws Exception {
|
||||
try (TestEnv testEnv = new TestEnv()) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulAssignment() throws Exception {
|
||||
try (TestEnv testEnv = new TestEnv()) {
|
||||
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
|
||||
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(1, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
|
||||
});
|
||||
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
|
||||
assertEquals(1, testEnv.assignmentsManager.numInFlight());
|
||||
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(0, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
|
||||
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
|
||||
});
|
||||
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"invalidRequest", "timeout"})
|
||||
public void testUnSuccessfulRequestCausesRetransmission(String failureType) throws Exception {
|
||||
try (TestEnv testEnv = new TestEnv()) {
|
||||
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(1, testEnv.assignmentsManager.numPending());
|
||||
});
|
||||
if (failureType.equals("invalidRequest")) {
|
||||
testEnv.channelManager.completeCallback(req -> {
|
||||
return mockClientResponse(new AssignReplicasToDirsResponseData().
|
||||
setErrorCode(Errors.INVALID_REQUEST.code()));
|
||||
});
|
||||
} else if (failureType.equals("timeout")) {
|
||||
testEnv.channelManager.completeCallback(req -> Optional.empty());
|
||||
}
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(1, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
|
||||
});
|
||||
assertEquals(1, testEnv.assignmentsManager.previousGlobalFailures());
|
||||
testEnv.successfullyCompleteCallbackOfRequestAssigningTopic1ToDir1();
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(0, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
|
||||
});
|
||||
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"missingTopic", "missingPartition", "notReplica"})
|
||||
public void testMismatchedInputDoesNotTriggerCompletion(String mismatchType) throws Exception {
|
||||
try (TestEnv testEnv = new TestEnv()) {
|
||||
TopicIdPartition target;
|
||||
if (mismatchType.equals("missingTopic")) {
|
||||
target = new TopicIdPartition(TOPIC_3, 0);
|
||||
} else if (mismatchType.equals("missingPartition")) {
|
||||
target = new TopicIdPartition(TOPIC_1, 2);
|
||||
} else if (mismatchType.equals("notReplica")) {
|
||||
target = new TopicIdPartition(TOPIC_2, 0);
|
||||
} else {
|
||||
throw new RuntimeException("invalid mismatchType argument.");
|
||||
}
|
||||
testEnv.onAssignment(target, DIR_1);
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(0, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(0, testEnv.success(target));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"missingResult", "errorResult"})
|
||||
public void testOneAssignmentFailsOneSucceeds(String failureType) throws Exception {
|
||||
try (TestEnv testEnv = new TestEnv()) {
|
||||
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
|
||||
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(2, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
|
||||
assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 1)));
|
||||
});
|
||||
testEnv.channelManager.completeCallback(req -> {
|
||||
AssignReplicasToDirsRequestData.DirectoryData directoryData = req.directories().get(0);
|
||||
assertEquals(DIR_1, directoryData.id());
|
||||
AssignReplicasToDirsRequestData.TopicData topicData = directoryData.topics().get(0);
|
||||
assertEquals(TOPIC_1, topicData.topicId());
|
||||
HashSet<Integer> foundPartitions = new HashSet<>();
|
||||
topicData.partitions().forEach(p -> foundPartitions.add(p.partitionIndex()));
|
||||
List<AssignReplicasToDirsResponseData.PartitionData> partitions = new ArrayList<>();
|
||||
if (foundPartitions.contains(0)) {
|
||||
partitions.add(new AssignReplicasToDirsResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setErrorCode((short) 0));
|
||||
}
|
||||
if (foundPartitions.contains(1)) {
|
||||
if (failureType.equals("missingResult")) {
|
||||
// do nothing
|
||||
} else if (failureType.equals("errorResult")) {
|
||||
partitions.add(new AssignReplicasToDirsResponseData.PartitionData().
|
||||
setPartitionIndex(1).
|
||||
setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()));
|
||||
} else {
|
||||
throw new RuntimeException("invalid failureType argument.");
|
||||
}
|
||||
}
|
||||
return mockClientResponse(new AssignReplicasToDirsResponseData().
|
||||
setDirectories(Arrays.asList(new AssignReplicasToDirsResponseData.DirectoryData().
|
||||
setId(DIR_1).
|
||||
setTopics(Arrays.asList(new AssignReplicasToDirsResponseData.TopicData().
|
||||
setTopicId(TOPIC_1).
|
||||
setPartitions(partitions))))));
|
||||
});
|
||||
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
|
||||
assertEquals(1, testEnv.assignmentsManager.numPending());
|
||||
assertEquals(1, testEnv.assignmentsManager.numInFlight());
|
||||
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 0)));
|
||||
assertEquals(0, testEnv.success(new TopicIdPartition(TOPIC_1, 1)));
|
||||
assertEquals(0, testEnv.assignmentsManager.previousGlobalFailures());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorTimeout() {
|
||||
assertEquals(Optional.of("Timeout"),
|
||||
AssignmentsManager.globalResponseError(Optional.empty()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoGlobalResponseError() {
|
||||
assertEquals(Optional.empty(),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false, null,
|
||||
null, new AssignReplicasToDirsResponse(
|
||||
new AssignReplicasToDirsResponseData())))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorAuthenticationException() {
|
||||
assertEquals(Optional.of("AuthenticationException"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false, null,
|
||||
new AuthenticationException("failed"), null))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorUnsupportedVersionException() {
|
||||
assertEquals(Optional.of("UnsupportedVersionException"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false,
|
||||
new UnsupportedVersionException("failed"), null, null))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorDisconnectedTimedOut() {
|
||||
assertEquals(Optional.of("Disonnected[Timeout]"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, true, true,
|
||||
null, null, null))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorEmptyResponse() {
|
||||
assertEquals(Optional.of("EmptyResponse"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false, false,
|
||||
null, null, null))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorClassCastException() {
|
||||
assertEquals(Optional.of("ClassCastException"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false, false,
|
||||
null, null, new ApiVersionsResponse(new ApiVersionsResponseData())))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobalResponseErrorResponseLevelError() {
|
||||
assertEquals(Optional.of("Response-level error: INVALID_REQUEST"),
|
||||
AssignmentsManager.globalResponseError(Optional.of(
|
||||
new ClientResponse(null, null, "", 0, 0, false, false,
|
||||
null, null, new AssignReplicasToDirsResponse(
|
||||
new AssignReplicasToDirsResponseData().
|
||||
setErrorCode(Errors.INVALID_REQUEST.code()))))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBuildRequestData() {
|
||||
Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
|
||||
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
|
||||
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
|
||||
}};
|
||||
AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData(8, 100L, assignment);
|
||||
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData()
|
||||
.setBrokerId(8)
|
||||
.setBrokerEpoch(100L)
|
||||
.setDirectories(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.DirectoryData()
|
||||
.setId(DIR_2)
|
||||
.setTopics(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.TopicData()
|
||||
.setTopicId(TOPIC_1)
|
||||
.setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData()
|
||||
.setPartitionIndex(2))),
|
||||
new AssignReplicasToDirsRequestData.TopicData()
|
||||
.setTopicId(TOPIC_2)
|
||||
.setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData()
|
||||
.setPartitionIndex(5))))),
|
||||
new AssignReplicasToDirsRequestData.DirectoryData()
|
||||
.setId(DIR_3)
|
||||
.setTopics(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.TopicData()
|
||||
.setTopicId(TOPIC_1)
|
||||
.setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData()
|
||||
.setPartitionIndex(3))))),
|
||||
new AssignReplicasToDirsRequestData.DirectoryData()
|
||||
.setId(DIR_1)
|
||||
.setTopics(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.TopicData()
|
||||
.setTopicId(TOPIC_1)
|
||||
.setPartitions(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData()
|
||||
.setPartitionIndex(4),
|
||||
new AssignReplicasToDirsRequestData.PartitionData()
|
||||
.setPartitionIndex(1)))))));
|
||||
assertRequestEquals(expected, built);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentAggregation() throws InterruptedException {
|
||||
CountDownLatch readyToAssert = new CountDownLatch(1);
|
||||
doAnswer(invocation -> {
|
||||
readyToAssert.countDown();
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
|
||||
any(ControllerRequestCompletionHandler.class));
|
||||
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, "testAssignmentAggregation", () -> { });
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2, "testAssignmentAggregation", () -> { });
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, "testAssignmentAggregation", () -> { });
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, "testAssignmentAggregation", () -> { });
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, "testAssignmentAggregation", () -> { });
|
||||
TestUtils.waitForCondition(() -> {
|
||||
time.sleep(100);
|
||||
manager.wakeup();
|
||||
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
|
||||
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
|
||||
|
||||
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
|
||||
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
|
||||
verify(channelManager, times(1)).start();
|
||||
verify(channelManager).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class));
|
||||
verify(channelManager, atMostOnce()).shutdown();
|
||||
verifyNoMoreInteractions(channelManager);
|
||||
assertEquals(1, captor.getAllValues().size());
|
||||
AssignReplicasToDirsRequestData actual = captor.getValue().build().data();
|
||||
AssignReplicasToDirsRequestData expected = buildRequestData(
|
||||
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
|
||||
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
|
||||
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
|
||||
}}
|
||||
);
|
||||
assertRequestEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRequeuesFailedAssignmentPropagations() throws InterruptedException {
|
||||
CountDownLatch readyToAssert = new CountDownLatch(5);
|
||||
doAnswer(invocation -> {
|
||||
readyToAssert.countDown();
|
||||
if (readyToAssert.getCount() == 4) {
|
||||
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onTimeout();
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3, "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
}
|
||||
if (readyToAssert.getCount() == 3) {
|
||||
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
|
||||
new ClientResponse(null, null, null, 0L, 0L, false, false,
|
||||
new UnsupportedVersionException("test unsupported version exception"), null, null));
|
||||
|
||||
// duplicate should be ignored
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3, "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
|
||||
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
}
|
||||
if (readyToAssert.getCount() == 2) {
|
||||
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
|
||||
new ClientResponse(null, null, null, 0L, 0L, false, false, null,
|
||||
new AuthenticationException("test authentication exception"), null)
|
||||
);
|
||||
|
||||
// duplicate should be ignored
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
|
||||
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"), "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 4),
|
||||
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"), "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
}
|
||||
if (readyToAssert.getCount() == 1) {
|
||||
invocation.getArgument(1, ControllerRequestCompletionHandler.class).onComplete(
|
||||
new ClientResponse(null, null, null, 0L, 0L, false, false, null, null,
|
||||
new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData()
|
||||
.setErrorCode(Errors.NOT_CONTROLLER.code())
|
||||
.setThrottleTimeMs(0))));
|
||||
}
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
|
||||
any(ControllerRequestCompletionHandler.class));
|
||||
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1, "testRequeuesFailedAssignmentPropagations", () -> { });
|
||||
TestUtils.waitForCondition(() -> {
|
||||
time.sleep(TimeUnit.SECONDS.toMillis(1));
|
||||
manager.wakeup();
|
||||
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
|
||||
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
|
||||
|
||||
ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
|
||||
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
|
||||
verify(channelManager, times(1)).start();
|
||||
verify(channelManager, times(5)).sendRequest(captor.capture(),
|
||||
any(ControllerRequestCompletionHandler.class));
|
||||
verify(channelManager, atMostOnce()).shutdown();
|
||||
verifyNoMoreInteractions(channelManager);
|
||||
assertEquals(5, captor.getAllValues().size());
|
||||
assertRequestEquals(buildRequestData(
|
||||
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
}}
|
||||
), captor.getAllValues().get(0).build().data());
|
||||
assertRequestEquals(buildRequestData(
|
||||
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
|
||||
}}
|
||||
), captor.getAllValues().get(1).build().data());
|
||||
assertRequestEquals(buildRequestData(
|
||||
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
|
||||
put(new TopicIdPartition(TOPIC_1, 3), Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
|
||||
put(new TopicIdPartition(TOPIC_1, 4), Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
|
||||
}}
|
||||
), captor.getAllValues().get(4).build().data());
|
||||
}
|
||||
|
||||
@Timeout(30)
|
||||
@Test
|
||||
void testOnCompletion() throws Exception {
|
||||
CountDownLatch readyToAssert = new CountDownLatch(300);
|
||||
doAnswer(invocation -> {
|
||||
AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data();
|
||||
ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class);
|
||||
completionHandler.onComplete(buildSuccessfulResponse(request));
|
||||
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
|
||||
any(ControllerRequestCompletionHandler.class));
|
||||
|
||||
for (int i = 0; i < 300; i++) {
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, i % 5), DIR_1, "testOnCompletion", readyToAssert::countDown);
|
||||
}
|
||||
|
||||
TestUtils.waitForCondition(() -> {
|
||||
time.sleep(TimeUnit.SECONDS.toMillis(1));
|
||||
manager.wakeup();
|
||||
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
|
||||
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
|
||||
}
|
||||
|
||||
private static ClientResponse buildSuccessfulResponse(AssignReplicasToDirsRequestData request) {
|
||||
return buildResponse(request, topicIdPartition -> Errors.NONE);
|
||||
}
|
||||
|
||||
private static ClientResponse buildResponse(AssignReplicasToDirsRequestData request,
|
||||
Function<TopicIdPartition, Errors> perPartitionError) {
|
||||
Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
|
||||
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
|
||||
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
|
||||
for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) {
|
||||
TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex());
|
||||
Errors error = perPartitionError.apply(topicIdPartition);
|
||||
if (error == null) {
|
||||
error = Errors.NONE;
|
||||
}
|
||||
errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors);
|
||||
return new ClientResponse(null, null, null,
|
||||
0L, 0L, false, false, null, null,
|
||||
new AssignReplicasToDirsResponse(responseData));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssignmentCompaction() throws Exception {
|
||||
// Delay the first controller response to force assignment compaction logic
|
||||
CompletableFuture<Runnable> completionFuture = new CompletableFuture<>();
|
||||
doAnswer(invocation -> {
|
||||
AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data();
|
||||
ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class);
|
||||
ClientResponse response = buildSuccessfulResponse(request);
|
||||
Runnable completion = () -> completionHandler.onComplete(response);
|
||||
if (completionFuture.isDone()) completion.run();
|
||||
else completionFuture.complete(completion);
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
|
||||
any(ControllerRequestCompletionHandler.class));
|
||||
|
||||
CountDownLatch remainingInvocations = new CountDownLatch(20);
|
||||
Runnable onComplete = () -> {
|
||||
assertTrue(completionFuture.isDone(), "Premature invocation");
|
||||
assertTrue(remainingInvocations.getCount() > 0, "Extra invocation");
|
||||
remainingInvocations.countDown();
|
||||
};
|
||||
Uuid[] dirs = {DIR_1, DIR_2, DIR_3};
|
||||
for (int i = 0; i < remainingInvocations.getCount(); i++) {
|
||||
time.sleep(100);
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, 0), dirs[i % 3], "testAssignmentCompaction", onComplete);
|
||||
}
|
||||
activeWait(completionFuture::isDone);
|
||||
completionFuture.get().run();
|
||||
activeWait(() -> remainingInvocations.getCount() == 0);
|
||||
}
|
||||
|
||||
void activeWait(Supplier<Boolean> predicate) throws InterruptedException {
|
||||
TestUtils.waitForCondition(() -> {
|
||||
boolean conditionSatisfied = predicate.get();
|
||||
if (!conditionSatisfied) {
|
||||
time.sleep(100);
|
||||
manager.wakeup();
|
||||
}
|
||||
return conditionSatisfied;
|
||||
}, TestUtils.DEFAULT_MAX_WAIT_MS, 50, null);
|
||||
}
|
||||
|
||||
static Metric findMetric(String name) {
|
||||
for (Map.Entry<MetricName, Metric> entry : KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet()) {
|
||||
MetricName metricName = entry.getKey();
|
||||
if (AssignmentsManager.class.getSimpleName().equals(metricName.getType()) && metricName.getName().equals(name)) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("metric named " + name + " not found");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void testQueuedReplicaToDirAssignmentsMetric() throws Exception {
|
||||
CountDownLatch readyToAssert = new CountDownLatch(1);
|
||||
doAnswer(invocation -> {
|
||||
readyToAssert.countDown();
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), any(ControllerRequestCompletionHandler.class));
|
||||
|
||||
Gauge<Integer> queuedReplicaToDirAssignments = (Gauge<Integer>) findMetric(AssignmentsManager.QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
|
||||
assertEquals(0, queuedReplicaToDirAssignments.value());
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, i), DIR_1, "testQueuedReplicaToDirAssignmentsMetric", () -> { });
|
||||
}
|
||||
TestUtils.waitForCondition(() -> {
|
||||
time.sleep(100);
|
||||
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
|
||||
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
|
||||
assertEquals(4, queuedReplicaToDirAssignments.value());
|
||||
|
||||
for (int i = 4; i < 8; i++) {
|
||||
manager.onAssignment(new TopicIdPartition(TOPIC_1, i), DIR_1, "testQueuedReplicaToDirAssignmentsMetric", () -> { });
|
||||
}
|
||||
TestUtils.retryOnExceptionWithTimeout(5_000, () -> assertEquals(8, queuedReplicaToDirAssignments.value()));
|
||||
}
|
||||
|
||||
// AssignmentsManager retries to propagate assignments (via AssignReplicasToDirsRequest) after failures.
|
||||
// When an assignment fails to propagate with NOT_LEADER_OR_FOLLOWER, AssignmentsManager should conclude
|
||||
// that the broker has been removed as a replica for the partition, and stop trying to propagate it.
|
||||
@Test
|
||||
void testDropsOldAssignments() throws InterruptedException {
|
||||
TopicIdPartition tp1 = new TopicIdPartition(TOPIC_1, 1), tp2 = new TopicIdPartition(TOPIC_1, 2);
|
||||
List<AssignReplicasToDirsRequestData> requests = new ArrayList<>();
|
||||
CountDownLatch readyToAssert = new CountDownLatch(2);
|
||||
doAnswer(invocation -> {
|
||||
AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data();
|
||||
ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class);
|
||||
if (readyToAssert.getCount() == 2) {
|
||||
// First request, reply with a partition-level NOT_LEADER_OR_FOLLOWER error and queue a different assignment
|
||||
completionHandler.onComplete(buildResponse(request, topicIdPartition -> Errors.NOT_LEADER_OR_FOLLOWER));
|
||||
manager.onAssignment(tp2, DIR_1, "testDropsOldAssignments-second");
|
||||
}
|
||||
if (readyToAssert.getCount() == 1) {
|
||||
// Second request, reply with success
|
||||
completionHandler.onComplete(buildSuccessfulResponse(request));
|
||||
}
|
||||
requests.add(request);
|
||||
readyToAssert.countDown();
|
||||
return null;
|
||||
}).when(channelManager).sendRequest(any(), any());
|
||||
|
||||
manager.onAssignment(tp1, DIR_1, "testDropsOldAssignments-first");
|
||||
TestUtils.waitForCondition(() -> {
|
||||
time.sleep(TimeUnit.SECONDS.toMillis(1));
|
||||
manager.wakeup();
|
||||
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
|
||||
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");
|
||||
|
||||
assertEquals(Arrays.asList(
|
||||
buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(tp1, DIR_1);
|
||||
}}),
|
||||
// Even though the controller replied with NOT_LEADER_OR_FOLLOWER, the second request does not include
|
||||
// partition 1, meaning AssignmentManager dropped (no longer retries) the assignment.
|
||||
buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{
|
||||
put(tp2, DIR_1);
|
||||
}})
|
||||
), requests);
|
||||
Map<TopicIdPartition, Uuid> assignments = new LinkedHashMap<>();
|
||||
assignments.put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
|
||||
assignments.put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
|
||||
assignments.put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
|
||||
assignments.put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
|
||||
assignments.put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
|
||||
Map<TopicIdPartition, Assignment> targetAssignments = new LinkedHashMap<>();
|
||||
assignments.entrySet().forEach(e -> targetAssignments.put(e.getKey(),
|
||||
new Assignment(e.getKey(), e.getValue(), 0, () -> { })));
|
||||
AssignReplicasToDirsRequestData built =
|
||||
AssignmentsManager.buildRequestData(8, 100L, targetAssignments);
|
||||
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData().
|
||||
setBrokerId(8).
|
||||
setBrokerEpoch(100L).
|
||||
setDirectories(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.DirectoryData().
|
||||
setId(DIR_2).
|
||||
setTopics(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_1).
|
||||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(2))),
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_2).
|
||||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(5))))),
|
||||
new AssignReplicasToDirsRequestData.DirectoryData().
|
||||
setId(DIR_3).
|
||||
setTopics(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_1).
|
||||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(3))))),
|
||||
new AssignReplicasToDirsRequestData.DirectoryData().
|
||||
setId(DIR_1).
|
||||
setTopics(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_1).
|
||||
setPartitions(Arrays.asList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(1),
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(4)))))));
|
||||
assertEquals(expected, built);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue