KAFKA-17793: Improve kcontroller robustness against long delays (#17502)

As described in KIP-500, the Kafka controller monitors the liveness of each broker in the cluster. It gathers this information from heartbeats sent from the brokers themselves.

In some rare cases, the main controller thread may get blocked for several seconds at a time. In the current code, this will result in the controller being unable to update the last contact times for the brokers during this time.

This PR changes the controller heartbeat handling to be partially lockless. Specifically, the last contact time for each broker will be updated locklessly prior to the rest of the heartbeat handling. This will ensure that heartbeats always get through.

Additionally, this PR adds a PeriodicTaskControlManager to better manage periodic tasks. This should help handle the very common pattern where we want to schedule a background task at some frequency. We also want the background task to be immediately rescheduled if there is too much work to be done in one event.

Reviewers: Liu Zeyu <zeyu.luke@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2024-10-28 08:36:07 -07:00 committed by GitHub
parent 6e88c10ed5
commit 14a9130f6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1358 additions and 626 deletions

View File

@ -42,11 +42,11 @@ import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
/**
* The BrokerHeartbeatManager manages all the soft state associated with broker heartbeats.
* Soft state is state which does not appear in the metadata log. This state includes
* things like the last time each broker sent us a heartbeat. As of KIP-841, the controlled
* shutdown state is no longer treated as soft state and is persisted to the metadata log on broker
* controlled shutdown requests.
* The BrokerHeartbeatManager manages some of the soft state associated with broker heartbeats.
* For example, it stores the last metadata offset which each broker reported. It contains the
* BrokerHeartbeatTracker, which stores the last time we received a heartbeat from each broker.
* In addition to storing this soft state, the BrokerHeartbeatManager aggregates some information
* about brokers (such as whether they're fenced or not) into a single place.
*
* Only the active controller has a BrokerHeartbeatManager, since only the active
* controller handles broker heartbeats. Standby controllers will create a heartbeat
@ -63,17 +63,15 @@ public class BrokerHeartbeatManager {
private final int id;
/**
* The last time we received a heartbeat from this broker, in monotonic nanoseconds.
* When this field is updated, we also may have to update the broker's position in
* the unfenced list.
* True if this broker is fenced.
*/
long lastContactNs;
private boolean fenced;
/**
* The last metadata offset which this broker reported. When this field is updated,
* we may also have to update the broker's position in the active set.
*/
long metadataOffset;
private long metadataOffset;
/**
* The offset at which the broker should complete its controlled shutdown, or -1
@ -82,23 +80,16 @@ public class BrokerHeartbeatManager {
*/
private long controlledShutdownOffset;
/**
* The previous entry in the unfenced list, or null if the broker is not in that list.
*/
private BrokerHeartbeatState prev;
/**
* The next entry in the unfenced list, or null if the broker is not in that list.
*/
private BrokerHeartbeatState next;
BrokerHeartbeatState(int id) {
BrokerHeartbeatState(
int id,
boolean fenced,
long metadataOffset,
long controlledShutdownOffset
) {
this.id = id;
this.lastContactNs = 0;
this.prev = null;
this.next = null;
this.metadataOffset = -1;
this.controlledShutdownOffset = -1;
this.fenced = fenced;
this.metadataOffset = metadataOffset;
this.controlledShutdownOffset = controlledShutdownOffset;
}
/**
@ -112,7 +103,18 @@ public class BrokerHeartbeatManager {
* Returns true only if the broker is fenced.
*/
boolean fenced() {
return prev == null;
return fenced;
}
/**
* Get the last metadata offset that was reported.
*/
long metadataOffset() {
return metadataOffset;
}
void setMetadataOffset(long metadataOffset) {
this.metadataOffset = metadataOffset;
}
/**
@ -142,135 +144,42 @@ public class BrokerHeartbeatManager {
}
}
static class BrokerHeartbeatStateList {
/**
* The head of the list of unfenced brokers. The list is sorted in ascending order
* of last contact time.
*/
private final BrokerHeartbeatState head;
BrokerHeartbeatStateList() {
this.head = new BrokerHeartbeatState(-1);
head.prev = head;
head.next = head;
}
/**
* Return the head of the list, or null if the list is empty.
*/
BrokerHeartbeatState first() {
BrokerHeartbeatState result = head.next;
return result == head ? null : result;
}
/**
* Add the broker to the list. We start looking for a place to put it at the end
* of the list.
*/
void add(BrokerHeartbeatState broker) {
BrokerHeartbeatState cur = head.prev;
while (true) {
if (cur == head || cur.lastContactNs <= broker.lastContactNs) {
broker.next = cur.next;
cur.next.prev = broker;
broker.prev = cur;
cur.next = broker;
break;
}
cur = cur.prev;
}
}
/**
* Remove a broker from the list.
*/
void remove(BrokerHeartbeatState broker) {
if (broker.next == null) {
throw new RuntimeException(broker + " is not in the list.");
}
broker.prev.next = broker.next;
broker.next.prev = broker.prev;
broker.prev = null;
broker.next = null;
}
BrokerHeartbeatStateIterator iterator() {
return new BrokerHeartbeatStateIterator(head);
}
}
static class BrokerHeartbeatStateIterator implements Iterator<BrokerHeartbeatState> {
private final BrokerHeartbeatState head;
private BrokerHeartbeatState cur;
BrokerHeartbeatStateIterator(BrokerHeartbeatState head) {
this.head = head;
this.cur = head;
}
@Override
public boolean hasNext() {
return cur.next != head;
}
@Override
public BrokerHeartbeatState next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
BrokerHeartbeatState result = cur.next;
cur = cur.next;
return result;
}
}
private final Logger log;
/**
* The Kafka clock object to use.
* Tracks the last time broker heartbeats were reported for each broker.
*/
private final Time time;
/**
* The broker session timeout in nanoseconds.
*/
private final long sessionTimeoutNs;
private final BrokerHeartbeatTracker tracker;
/**
* Maps broker IDs to heartbeat states.
*/
private final HashMap<Integer, BrokerHeartbeatState> brokers;
/**
* The list of unfenced brokers, sorted by last contact time.
*/
private final BrokerHeartbeatStateList unfenced;
/**
* The set of active brokers. A broker is active if it is unfenced, and not shutting
* down.
*/
private final TreeSet<BrokerHeartbeatState> active;
BrokerHeartbeatManager(LogContext logContext,
Time time,
long sessionTimeoutNs) {
BrokerHeartbeatManager(
LogContext logContext,
Time time,
long sessionTimeoutNs
) {
this.log = logContext.logger(BrokerHeartbeatManager.class);
this.time = time;
this.sessionTimeoutNs = sessionTimeoutNs;
this.tracker = new BrokerHeartbeatTracker(time, sessionTimeoutNs);
this.brokers = new HashMap<>();
this.unfenced = new BrokerHeartbeatStateList();
this.active = new TreeSet<>(MetadataOffsetComparator.INSTANCE);
}
BrokerHeartbeatTracker tracker() {
return tracker;
}
// VisibleForTesting
Time time() {
return time;
}
// VisibleForTesting
BrokerHeartbeatStateList unfenced() {
return unfenced;
return tracker.time();
}
// VisibleForTesting
@ -287,7 +196,6 @@ public class BrokerHeartbeatManager {
return OptionalLong.of(broker.controlledShutdownOffset);
}
/**
* Mark a broker as fenced.
*
@ -296,7 +204,8 @@ public class BrokerHeartbeatManager {
void fence(int brokerId) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker != null) {
untrack(broker);
broker.fenced = true;
active.remove(broker);
}
}
@ -308,7 +217,7 @@ public class BrokerHeartbeatManager {
void remove(int brokerId) {
BrokerHeartbeatState broker = brokers.remove(brokerId);
if (broker != null) {
untrack(broker);
active.remove(broker);
}
}
@ -320,7 +229,6 @@ public class BrokerHeartbeatManager {
*/
private void untrack(BrokerHeartbeatState broker) {
if (!broker.fenced()) {
unfenced.remove(broker);
if (!broker.shuttingDown()) {
active.remove(broker);
}
@ -331,28 +239,12 @@ public class BrokerHeartbeatManager {
* Check if the given broker has a valid session.
*
* @param brokerId The broker ID to check.
* @param brokerEpoch The broker epoch to check.
*
* @return True if the given broker has a valid session.
*/
boolean hasValidSession(int brokerId) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null) return false;
return hasValidSession(broker);
}
/**
* Check if the given broker has a valid session.
*
* @param broker The broker to check.
*
* @return True if the given broker has a valid session.
*/
private boolean hasValidSession(BrokerHeartbeatState broker) {
if (broker.fenced()) {
return false;
} else {
return broker.lastContactNs + sessionTimeoutNs >= time.nanoseconds();
}
boolean hasValidSession(int brokerId, long brokerEpoch) {
return tracker.hasValidSession(new BrokerIdAndEpoch(brokerId, brokerEpoch));
}
/**
@ -366,7 +258,7 @@ public class BrokerHeartbeatManager {
BrokerHeartbeatState broker = brokers.get(brokerId);
long metadataOffset = -1L;
if (broker == null) {
broker = new BrokerHeartbeatState(brokerId);
broker = new BrokerHeartbeatState(brokerId, fenced, -1L, -1L);
brokers.put(brokerId, broker);
} else if (broker.fenced() != fenced) {
metadataOffset = broker.metadataOffset;
@ -388,18 +280,23 @@ public class BrokerHeartbeatManager {
// position in either of those data structures depends on values we are
// changing here. We will re-add it if necessary at the end of this function.
untrack(broker);
broker.lastContactNs = time.nanoseconds();
broker.fenced = fenced;
broker.metadataOffset = metadataOffset;
boolean isActive = false;
if (fenced) {
// If a broker is fenced, it leaves controlled shutdown. On its next heartbeat,
// it will shut down immediately.
broker.controlledShutdownOffset = -1;
} else {
unfenced.add(broker);
if (!broker.shuttingDown()) {
active.add(broker);
isActive = true;
}
}
if (isActive) {
active.add(broker);
} else {
active.remove(broker);
}
}
long lowestActiveOffset() {
@ -431,38 +328,6 @@ public class BrokerHeartbeatManager {
}
}
/**
* Return the time in monotonic nanoseconds at which we should check if a broker
* session needs to be expired.
*/
long nextCheckTimeNs() {
BrokerHeartbeatState broker = unfenced.first();
if (broker == null) {
return Long.MAX_VALUE;
} else {
return broker.lastContactNs + sessionTimeoutNs;
}
}
/**
* Check if the oldest broker to have heartbeated has already violated the
* sessionTimeoutNs timeout and needs to be fenced.
*
* @return An Optional broker node id.
*/
Optional<Integer> findOneStaleBroker() {
BrokerHeartbeatStateIterator iterator = unfenced.iterator();
if (iterator.hasNext()) {
BrokerHeartbeatState broker = iterator.next();
// The unfenced list is sorted on last contact time from each
// broker. If the first broker is not stale, then none is.
if (!hasValidSession(broker)) {
return Optional.of(broker.id);
}
}
return Optional.empty();
}
Iterator<UsableBroker> usableBrokers(
Function<Integer, Optional<String>> idToRack
) {

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.utils.Time;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
/**
* The BrokerheartbeatTracker stores the last time each broker sent a heartbeat to us.
* This class will be present only on the active controller.
*
* UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM MULTIPLE THREADS.
* Everything in here must be thread-safe. It is intended to be accessed directly from the
* request handler thread pool. This ensures that the heartbeats always get through, even
* if the main controller thread is busy.
*/
class BrokerHeartbeatTracker {
/**
* The clock to use.
*/
private final Time time;
/**
* The broker session timeout in nanoseconds.
*/
private final long sessionTimeoutNs;
/**
* Maps a broker ID and epoch to the last contact time in monotonic nanoseconds.
*/
private final ConcurrentHashMap<BrokerIdAndEpoch, Long> contactTimes;
BrokerHeartbeatTracker(Time time, long sessionTimeoutNs) {
this.time = time;
this.sessionTimeoutNs = sessionTimeoutNs;
this.contactTimes = new ConcurrentHashMap<>();
}
Time time() {
return time;
}
/**
* Update the contact time for the given broker ID and epoch to be the current time.
*
* @param idAndEpoch The broker ID and epoch.
*/
void updateContactTime(BrokerIdAndEpoch idAndEpoch) {
updateContactTime(idAndEpoch, time.nanoseconds());
}
/**
* Update the contact time for the given broker ID and epoch to be the given time.
*
* @param idAndEpoch The broker ID and epoch.
* @param timeNs The monotonic time in nanoseconds.
*/
void updateContactTime(BrokerIdAndEpoch idAndEpoch, long timeNs) {
contactTimes.put(idAndEpoch, timeNs);
}
/**
* Get the contact time for the given broker ID and epoch.
*
* @param idAndEpoch The broker ID and epoch.
* @return The contact time, or Optional.empty if none is known.
*/
OptionalLong contactTime(BrokerIdAndEpoch idAndEpoch) {
Long value = contactTimes.get(idAndEpoch);
if (value == null) return OptionalLong.empty();
return OptionalLong.of(value);
}
/**
* Remove either one or zero expired brokers from the map.
*
* @return The expired broker that was removed, or Optional.empty if there was none.
*/
Optional<BrokerIdAndEpoch> maybeRemoveExpired() {
return maybeRemoveExpired(time.nanoseconds());
}
/**
* Remove either one or zero expired brokers from the map.
*
* @param nowNs The current time in monotonic nanoseconds.
*
* @return The expired broker that was removed, or Optional.empty if there was none.
*/
Optional<BrokerIdAndEpoch> maybeRemoveExpired(long nowNs) {
Iterator<Entry<BrokerIdAndEpoch, Long>> iterator =
contactTimes.entrySet().iterator();
while (iterator.hasNext()) {
Entry<BrokerIdAndEpoch, Long> entry = iterator.next();
if (isExpired(entry.getValue(), nowNs)) {
iterator.remove();
return Optional.of(entry.getKey());
}
}
return Optional.empty();
}
/**
* Return true if the given time is outside the expiration window.
* If the timestamp has undergone 64-bit rollover, we will not expire anything.
*
* @param timeNs The provided time in monotonic nanoseconds.
* @param nowNs The current time in monotonic nanoseconds.
* @return True if the timestamp is expired.
*/
boolean isExpired(long timeNs, long nowNs) {
return (nowNs > timeNs) && (timeNs + sessionTimeoutNs < nowNs);
}
/**
* Return true if the given broker has a session whose time has not yet expired.
*
* @param idAndEpoch The broker id and epoch.
* @return True only if the broker session was found and is still valid.
*/
boolean hasValidSession(BrokerIdAndEpoch idAndEpoch) {
Long timeNs = contactTimes.get(idAndEpoch);
if (timeNs == null) return false;
return !isExpired(timeNs, time.nanoseconds());
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.Objects;
public class BrokerIdAndEpoch {
private final int id;
private final long epoch;
public BrokerIdAndEpoch(
int id,
long epoch
) {
this.id = id;
this.epoch = epoch;
}
public int id() {
return id;
}
public long epoch() {
return epoch;
}
@Override
public boolean equals(Object o) {
if (o == null || (!(o instanceof BrokerIdAndEpoch))) return false;
BrokerIdAndEpoch other = (BrokerIdAndEpoch) o;
return id == other.id && epoch == other.epoch;
}
@Override
public int hashCode() {
return Objects.hash(id, epoch);
}
@Override
public String toString() {
return "BrokerIdAndEpoch(id=" + id + ", epoch=" + epoch + ")";
}
}

View File

@ -240,7 +240,7 @@ public class ClusterControlManager {
/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
private BrokerHeartbeatManager heartbeatManager;
private volatile BrokerHeartbeatManager heartbeatManager;
/**
* A future which is completed as soon as we have the given number of brokers
@ -356,7 +356,7 @@ public class ClusterControlManager {
Uuid prevIncarnationId = null;
if (existing != null) {
prevIncarnationId = existing.incarnationId();
if (heartbeatManager.hasValidSession(brokerId)) {
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
"registered with that broker id.");
@ -511,6 +511,22 @@ public class ClusterControlManager {
setMaxSupportedVersion(feature.maxSupportedVersion());
}
/**
* Track an incoming broker heartbeat. Unlike most functions, this one is not called from the main
* controller thread, so it can only access local, volatile and atomic data.
*
* @param brokerId The broker id to track.
* @param brokerEpoch The broker epoch to track.
*
* @returns True only if the ClusterControlManager is active.
*/
boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
BrokerHeartbeatManager manager = heartbeatManager;
if (manager == null) return false;
manager.tracker().updateContactTime(new BrokerIdAndEpoch(brokerId, brokerEpoch));
return true;
}
public OptionalLong registerBrokerRecordOffset(int brokerId) {
Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
if (registrationOffset != null) {
@ -715,6 +731,9 @@ public class ClusterControlManager {
}
BrokerHeartbeatManager heartbeatManager() {
// We throw RuntimeException here rather than NotControllerException because all the callers
// have already verified that we are active. For example, ControllerWriteEvent.run verifies
// that we are the current active controller before running any event-specific code.
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}

View File

@ -42,6 +42,7 @@ import org.slf4j.Logger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import javax.crypto.Mac;
@ -59,6 +60,8 @@ import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
* Manages DelegationTokens.
*/
public class DelegationTokenControlManager {
private static final int MAX_RECORDS_PER_EXPIRATION = 1000;
private final Time time = Time.SYSTEM;
static class Builder {
@ -95,6 +98,7 @@ public class DelegationTokenControlManager {
DelegationTokenControlManager build() {
if (logContext == null) logContext = new LogContext();
if (tokenCache == null) tokenCache = new DelegationTokenCache(Collections.emptySet());
return new DelegationTokenControlManager(
logContext,
tokenCache,
@ -330,9 +334,9 @@ public class DelegationTokenControlManager {
}
// Periodic call to remove expired DelegationTokens
public List<ApiMessageAndVersion> sweepExpiredDelegationTokens() {
public ControllerResult<Boolean> sweepExpiredDelegationTokens() {
long now = time.milliseconds();
List<ApiMessageAndVersion> records = new ArrayList<>();
List<ApiMessageAndVersion> records = new ArrayList<>(0);
for (TokenInformation oldTokenInformation: tokenCache.tokens()) {
if ((oldTokenInformation.maxTimestamp() < now) ||
@ -341,9 +345,12 @@ public class DelegationTokenControlManager {
oldTokenInformation.tokenId(), oldTokenInformation.ownerAsString());
records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord().
setTokenId(oldTokenInformation.tokenId()), (short) 0));
if (records.size() >= MAX_RECORDS_PER_EXPIRATION) {
return ControllerResult.of(records, true);
}
}
}
return records;
return ControllerResult.of(records, false);
}
public void replay(DelegationTokenRecord record) {

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.EnumSet;
import java.util.function.Supplier;
class PeriodicTask {
/**
* The name of this periodic task.
*/
private final String name;
/**
* The callback for this task. If ControllerResult.response is true, we will schedule the
* task again after only a very short delay. This is useful if we only finished part of the
* work we wanted to finish.
*/
private final Supplier<ControllerResult<Boolean>> op;
/**
* The period of the task, in nanoseconds.
*/
private final long periodNs;
/**
* The flags used by this periodic task.
*/
private final EnumSet<PeriodicTaskFlag> flags;
PeriodicTask(
String name,
Supplier<ControllerResult<Boolean>> op,
long periodNs,
EnumSet<PeriodicTaskFlag> flags
) {
this.name = name;
this.op = op;
this.periodNs = periodNs;
this.flags = flags;
}
String name() {
return name;
}
Supplier<ControllerResult<Boolean>> op() {
return op;
}
long periodNs() {
return periodNs;
}
EnumSet<PeriodicTaskFlag> flags() {
return flags;
}
}

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.errors.PeriodicControlTaskException;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
class PeriodicTaskControlManager {
static class Builder {
private LogContext logContext = null;
private Time time = Time.SYSTEM;
private QueueAccessor queueAccessor = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setTime(Time time) {
this.time = time;
return this;
}
Builder setQueueAccessor(QueueAccessor queueAccessor) {
this.queueAccessor = queueAccessor;
return this;
}
PeriodicTaskControlManager build() {
if (logContext == null) logContext = new LogContext();
if (queueAccessor == null) throw new RuntimeException("You must set queueAccessor");
return new PeriodicTaskControlManager(logContext,
time,
queueAccessor);
}
}
interface QueueAccessor {
void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
);
void cancelDeferred(String tag);
}
class PeriodicTaskOperation implements Supplier<ControllerResult<Void>> {
private final PeriodicTask task;
PeriodicTaskOperation(PeriodicTask task) {
this.task = task;
}
@Override
public ControllerResult<Void> get() {
long startNs = 0;
if (log.isDebugEnabled() || task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
startNs = time.nanoseconds();
}
ControllerResult<Boolean> result;
try {
result = task.op().get();
} catch (Exception e) {
// Reschedule the task after a lengthy delay.
reschedule(task, false, true);
// We wrap the exception in a PeriodicControlTaskException before throwing it to ensure
// that it is handled correctly in QuorumController::handleEventException. We want it to
// cause the metadata error metric to be incremented, but not cause a controller failover.
throw new PeriodicControlTaskException(task.name() + ": periodic task failed: " +
e.getMessage(), e);
}
if (log.isDebugEnabled() || task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
long endNs = time.nanoseconds();
long durationUs = NANOSECONDS.toMicros(endNs - startNs);
if (task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
log.info("Periodic task {} generated {} records in {} microseconds.",
task.name(), result.records().size(), durationUs);
} else if (log.isDebugEnabled()) {
log.debug("Periodic task {} generated {} records in {} microseconds.",
task.name(), result.records().size(), durationUs);
}
}
reschedule(task, result.response(), false);
if (result.isAtomic()) {
return ControllerResult.atomicOf(result.records(), null);
} else {
return ControllerResult.of(result.records(), null);
}
}
}
/**
* The slf4j logger.
*/
private final Logger log;
/**
* The clock.
*/
private final Time time;
/**
* Used to schedule events on the queue.
*/
private final QueueAccessor queueAccessor;
/**
* True if the manager is active.
*/
private boolean active;
/**
* The currently registered periodic tasks.
*/
private final Map<String, PeriodicTask> tasks;
private PeriodicTaskControlManager(
LogContext logContext,
Time time,
QueueAccessor queueAccessor
) {
this.log = logContext.logger(OffsetControlManager.class);
this.time = time;
this.queueAccessor = queueAccessor;
this.active = false;
this.tasks = new HashMap<>();
}
boolean active() {
return active;
}
void registerTask(PeriodicTask task) {
if (tasks.containsKey(task.name())) {
log.debug("Periodic task {} is already registered.", task.name());
return;
}
tasks.put(task.name(), task);
log.info("Registering periodic task {} to run every {} ms", task.name(),
NANOSECONDS.toMillis(task.periodNs()));
reschedule(task, false, false);
}
void unregisterTask(String taskName) {
PeriodicTask task = tasks.remove(taskName);
if (task == null) {
log.debug("Periodic task {} is already unregistered.", taskName);
return;
}
log.info("Unregistering periodic task {}", taskName);
reschedule(task, false, false);
}
private long nextDelayTimeNs(PeriodicTask task, boolean immediate, boolean error) {
if (immediate) {
// The current implementation of KafkaEventQueue always picks from the deferred
// collection of operations before picking from the non-deferred collection of
// operations. This can result in some unfairness if deferred operation are
// scheduled for immediate execution. This delays them by a small amount of time.
return MILLISECONDS.toNanos(10);
} else if (error) {
// If the periodic task hit an error, reschedule it in 5 minutes. This is to avoid
// scenarios where we spin in a tight loop hitting errors, but still give the task
// a chance to succeed.
return MINUTES.toNanos(5);
} else {
// Otherwise, use the designated period.
return task.periodNs();
}
}
private void reschedule(PeriodicTask task, boolean immediate, boolean error) {
if (!active) {
log.trace("cancelling {} because we are inactive.", task.name());
queueAccessor.cancelDeferred(task.name());
} else if (tasks.containsKey(task.name())) {
long nextDelayTimeNs = nextDelayTimeNs(task, immediate, error);
long nextRunTimeNs = time.nanoseconds() + nextDelayTimeNs;
log.trace("rescheduling {} in {} ns (immediate = {}, error = {})",
task.name(), nextDelayTimeNs, immediate);
queueAccessor.scheduleDeferred(task.name(),
nextRunTimeNs,
new PeriodicTaskOperation(task));
} else {
log.trace("cancelling {} because it does not appear in the task map.", task.name());
queueAccessor.cancelDeferred(task.name());
}
}
/**
* Called when the QuorumController becomes active.
*/
void activate() {
if (active) {
throw new RuntimeException("Can't activate already active PeriodicTaskControlManager.");
}
active = true;
for (PeriodicTask task : tasks.values()) {
reschedule(task, false, false);
}
String[] taskNames = tasks.keySet().toArray(new String[0]);
Arrays.sort(taskNames);
log.info("Activated periodic tasks: {}", String.join(", ", taskNames));
}
/**
* Called when the QuorumController becomes inactive.
*/
void deactivate() {
if (!active) {
return;
}
active = false;
for (PeriodicTask task : tasks.values()) {
reschedule(task, false, false);
}
String[] taskNames = tasks.keySet().toArray(new String[0]);
Arrays.sort(taskNames);
log.info("Deactivated periodic tasks: {}", String.join(", ", taskNames));
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
/**
* The flags to use for a periodic task.
*/
enum PeriodicTaskFlag {
/**
* Set if we want to log the name and execution time on each run.
*/
VERBOSE;
}

View File

@ -26,7 +26,6 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
@ -129,6 +128,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -149,6 +149,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
@ -218,7 +219,7 @@ public final class QuorumController implements Controller {
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
private long delegationTokenExpiryTimeMs;
private long delegationTokenExpiryCheckIntervalMs;
private long delegationTokenExpiryCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
private long uncleanLeaderElectionCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
private String interBrokerListenerName = "PLAINTEXT";
@ -496,6 +497,32 @@ public final class QuorumController implements Controller {
}
}
class PeriodicTaskControlManagerQueueAccessor implements PeriodicTaskControlManager.QueueAccessor {
@Override
public void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
) {
EnumSet<ControllerOperationFlag> flags = EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME);
queue.scheduleDeferred(tag,
new EarliestDeadlineFunction(deadlineNs),
new ControllerWriteEvent<>(tag,
new ControllerWriteOperation<Void>() {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
return op.get();
}
},
flags));
}
@Override
public void cancelDeferred(String tag) {
queue.cancelDeferred(tag);
}
}
private OptionalInt latestController() {
return raftClient.leaderAndEpoch().leaderId();
}
@ -811,13 +838,6 @@ public final class QuorumController implements Controller {
"reaches offset {}.", this, resultAndOffset.offset());
}
// After every controller write event, schedule a leader rebalance if there are any topic partition
// with leader that is not the preferred leader.
maybeScheduleNextBalancePartitionLeaders();
// Schedule a new unclean leader election if there are partitions that do not have a leader.
maybeScheduleNextElectUncleanLeaders();
// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
deferredEventQueue.add(resultAndOffset.offset(), this);
@ -1140,10 +1160,7 @@ public final class QuorumController implements Controller {
// periodic tasks here. At this point, all the records we generated in
// generateRecordsAndResult have been applied, so we have the correct value for
// metadata.version and other in-memory state.
maybeScheduleNextExpiredDelegationTokenSweep();
maybeScheduleNextBalancePartitionLeaders();
maybeScheduleNextElectUncleanLeaders();
maybeScheduleNextWriteNoOpRecord();
periodicControl.activate();
}
}
@ -1159,255 +1176,12 @@ public final class QuorumController implements Controller {
newWrongControllerException(OptionalInt.empty()));
offsetControl.deactivate();
clusterControl.deactivate();
cancelMaybeFenceReplicas();
cancelMaybeBalancePartitionLeaders();
cancelMaybeNextElectUncleanLeaders();
cancelNextWriteNoOpRecord();
periodicControl.deactivate();
} catch (Throwable e) {
fatalFaultHandler.handleFault("exception while renouncing leadership", e);
}
}
private <T> void scheduleDeferredWriteEvent(
String name,
long deadlineNs,
ControllerWriteOperation<T> op,
EnumSet<ControllerOperationFlag> flags
) {
if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
throw new RuntimeException("deferred events should not update the queue time.");
}
ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags);
queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
event.future.exceptionally(e -> {
if (ControllerExceptions.isTimeoutException(e)) {
log.error("Cancelling deferred write event {} because the event queue " +
"is now closed.", name);
return null;
} else if (e instanceof NotControllerException) {
log.debug("Cancelling deferred write event {} because this controller " +
"is no longer active.", name);
return null;
}
log.error("Unexpected exception while executing deferred write event {}. " +
"Rescheduling for a minute from now.", name, e);
scheduleDeferredWriteEvent(name,
deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, flags);
return null;
});
}
static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
private void rescheduleMaybeFenceStaleBrokers() {
long nextCheckTimeNs = clusterControl.heartbeatManager().nextCheckTimeNs();
if (nextCheckTimeNs == Long.MAX_VALUE) {
cancelMaybeFenceReplicas();
return;
}
scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs,
() -> {
ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
// This following call ensures that if there are multiple brokers that
// are currently stale, then fencing for them is scheduled immediately
rescheduleMaybeFenceStaleBrokers();
return result;
},
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
}
private void cancelMaybeFenceReplicas() {
queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
}
private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders";
private void maybeScheduleNextBalancePartitionLeaders() {
if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
leaderImbalanceCheckIntervalNs.isPresent() &&
replicationControl.arePartitionLeadersImbalanced()) {
log.debug(
"Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})",
MAYBE_BALANCE_PARTITION_LEADERS,
imbalancedScheduled,
leaderImbalanceCheckIntervalNs,
replicationControl.arePartitionLeadersImbalanced()
);
ControllerWriteEvent<Boolean> event = new ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
long startTimeNs = time.nanoseconds();
ControllerResult<Boolean> result = replicationControl.maybeBalancePartitionLeaders();
long endTimeNs = time.nanoseconds();
long durationNs = endTimeNs - startTimeNs;
log.info("maybeBalancePartitionLeaders: generated {} records in {} microseconds.{}",
result.records().size(), NANOSECONDS.toMicros(durationNs),
result.response() ? " Rescheduling immediately." : "");
// reschedule the operation after the leaderImbalanceCheckIntervalNs interval.
// Mark the imbalance event as completed and reschedule if necessary
if (result.response()) {
imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY;
} else {
imbalancedScheduled = ImbalanceSchedule.DEFERRED;
}
// Note that rescheduling this event here is not required because MAYBE_BALANCE_PARTITION_LEADERS
// is a ControllerWriteEvent. ControllerWriteEvent always calls this method after the records
// generated by a ControllerWriteEvent have been applied.
return result;
}, EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
long delayNs = time.nanoseconds();
if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
delayNs += leaderImbalanceCheckIntervalNs.getAsLong();
} else {
// The current implementation of KafkaEventQueue always picks from the deferred collection of operations
// before picking from the non-deferred collection of operations. This can result in some unfairness if
// deferred operation are scheduled for immediate execution. This delays them by a small amount of time.
delayNs += NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
}
queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new EarliestDeadlineFunction(delayNs), event);
imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
}
}
private void cancelMaybeBalancePartitionLeaders() {
imbalancedScheduled = ImbalanceSchedule.DEFERRED;
queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
}
private static final String MAYBE_ELECT_UNCLEAN_LEADERS = "maybeElectUncleanLeaders";
private void maybeScheduleNextElectUncleanLeaders() {
if (uncleanScheduled != ImbalanceSchedule.SCHEDULED &&
replicationControl.areSomePartitionsLeaderless()) {
log.debug(
"Scheduling write event for {} because scheduled ({}), and areSomePartitionsLeaderless ({})",
MAYBE_ELECT_UNCLEAN_LEADERS,
uncleanScheduled,
replicationControl.areSomePartitionsLeaderless()
);
ControllerWriteEvent<Boolean> event = new ControllerWriteEvent<>(MAYBE_ELECT_UNCLEAN_LEADERS, () -> {
long startTimeNs = time.nanoseconds();
ControllerResult<Boolean> result = replicationControl.maybeElectUncleanLeaders();
long endTimeNs = time.nanoseconds();
long durationNs = endTimeNs - startTimeNs;
log.info("maybeElectUncleanLeaders: generated {} records in {} microseconds.{}",
result.records().size(), NANOSECONDS.toMicros(durationNs),
result.response() ? " Rescheduling immediately." : "");
if (result.response()) {
uncleanScheduled = ImbalanceSchedule.IMMEDIATELY;
} else {
uncleanScheduled = ImbalanceSchedule.DEFERRED;
}
return result;
}, EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
long delayNs = time.nanoseconds();
if (uncleanScheduled == ImbalanceSchedule.DEFERRED) {
delayNs += uncleanLeaderElectionCheckIntervalNs;
} else {
// The current implementation of KafkaEventQueue always picks from the deferred collection of operations
// before picking from the non-deferred collection of operations. This can result in some unfairness if
// deferred operation are scheduled for immediate execution. This delays them by a small amount of time.
delayNs += NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
}
queue.scheduleDeferred(MAYBE_ELECT_UNCLEAN_LEADERS, new EarliestDeadlineFunction(delayNs), event);
uncleanScheduled = ImbalanceSchedule.SCHEDULED;
}
}
private void cancelMaybeNextElectUncleanLeaders() {
uncleanScheduled = ImbalanceSchedule.DEFERRED;
queue.cancelDeferred(MAYBE_ELECT_UNCLEAN_LEADERS);
}
private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
private void maybeScheduleNextWriteNoOpRecord() {
if (!noOpRecordScheduled &&
maxIdleIntervalNs.isPresent() &&
featureControl.metadataVersion().isNoOpRecordSupported()) {
log.debug(
"Scheduling write event for {} because maxIdleIntervalNs ({}) and metadataVersion ({})",
WRITE_NO_OP_RECORD,
maxIdleIntervalNs.getAsLong(),
featureControl.metadataVersion()
);
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
WRITE_NO_OP_RECORD,
() -> {
noOpRecordScheduled = false;
maybeScheduleNextWriteNoOpRecord();
return ControllerResult.of(
Collections.singletonList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
null
);
},
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)
);
long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EarliestDeadlineFunction(delayNs), event);
noOpRecordScheduled = true;
}
}
private void cancelNextWriteNoOpRecord() {
noOpRecordScheduled = false;
queue.cancelDeferred(WRITE_NO_OP_RECORD);
}
private static final String SWEEP_EXPIRED_DELEGATION_TOKENS = "sweepExpiredDelegationTokens";
private void maybeScheduleNextExpiredDelegationTokenSweep() {
if (featureControl.metadataVersion().isDelegationTokenSupported() &&
delegationTokenControlManager.isEnabled()) {
log.debug(
"Scheduling write event for {} because DelegationTokens are enabled.",
SWEEP_EXPIRED_DELEGATION_TOKENS
);
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
SWEEP_EXPIRED_DELEGATION_TOKENS,
() -> {
maybeScheduleNextExpiredDelegationTokenSweep();
return ControllerResult.of(
delegationTokenControlManager.sweepExpiredDelegationTokens(), null);
},
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)
);
long delayNs = time.nanoseconds() +
NANOSECONDS.convert(delegationTokenExpiryCheckIntervalMs, TimeUnit.MILLISECONDS);
queue.scheduleDeferred(SWEEP_EXPIRED_DELEGATION_TOKENS,
new EarliestDeadlineFunction(delayNs), event);
}
}
private void handleFeatureControlChange() {
// The feature control maybe have changed. On the active controller cancel or schedule noop
// record writes accordingly.
if (isActiveController()) {
if (featureControl.metadataVersion().isNoOpRecordSupported()) {
maybeScheduleNextWriteNoOpRecord();
} else {
cancelNextWriteNoOpRecord();
}
}
}
/**
* Apply the metadata record to its corresponding in-memory state(s)
*
@ -1458,7 +1232,6 @@ public final class QuorumController implements Controller {
break;
case FEATURE_LEVEL_RECORD:
featureControl.replay((FeatureLevelRecord) message);
handleFeatureControlChange();
break;
case CLIENT_QUOTA_RECORD:
clientQuotaControlManager.replay((ClientQuotaRecord) message);
@ -1589,6 +1362,16 @@ public final class QuorumController implements Controller {
*/
private final QuorumClusterFeatureSupportDescriber clusterSupportDescriber;
/**
* Handles changes to the event queue for PeriodicTaskControlManager.
*/
private final PeriodicTaskControlManagerQueueAccessor queueAccessor;
/**
* Controls periodic tasks.
*/
private final PeriodicTaskControlManager periodicControl;
/**
* An object which stores the controller's view of the cluster.
* This must be accessed only by the event queue thread.
@ -1621,7 +1404,6 @@ public final class QuorumController implements Controller {
/**
* Manages DelegationTokens, if there are any.
*/
private final long delegationTokenExpiryCheckIntervalMs;
private final DelegationTokenControlManager delegationTokenControlManager;
/**
@ -1661,11 +1443,6 @@ public final class QuorumController implements Controller {
*/
private final OptionalLong leaderImbalanceCheckIntervalNs;
/**
* How log to delay between appending NoOpRecord to the log.
*/
private final OptionalLong maxIdleIntervalNs;
private enum ImbalanceSchedule {
// The leader balancing operation has been scheduled
SCHEDULED,
@ -1685,11 +1462,6 @@ public final class QuorumController implements Controller {
*/
private ImbalanceSchedule uncleanScheduled = ImbalanceSchedule.DEFERRED;
/**
* Tracks if the write of the NoOpRecord has been scheduled.
*/
private boolean noOpRecordScheduled = false;
/**
* The bootstrap metadata to use for initialization if needed.
*/
@ -1697,11 +1469,6 @@ public final class QuorumController implements Controller {
private final boolean eligibleLeaderReplicasEnabled;
/**
* The number of nanoseconds between unclean leader election checks.
*/
private final long uncleanLeaderElectionCheckIntervalNs;
/**
* The maximum number of records per batch to allow.
*/
@ -1777,6 +1544,12 @@ public final class QuorumController implements Controller {
setSnapshotRegistry(snapshotRegistry).
build();
this.clusterSupportDescriber = new QuorumClusterFeatureSupportDescriber();
this.queueAccessor = new PeriodicTaskControlManagerQueueAccessor();
this.periodicControl = new PeriodicTaskControlManager.Builder().
setLogContext(logContext).
setTime(time).
setQueueAccessor(queueAccessor).
build();
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
@ -1806,7 +1579,6 @@ public final class QuorumController implements Controller {
setClusterControlManager(clusterControl).
build();
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
@ -1823,7 +1595,6 @@ public final class QuorumController implements Controller {
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
this.delegationTokenExpiryCheckIntervalMs = delegationTokenExpiryCheckIntervalMs;
this.delegationTokenControlManager = new DelegationTokenControlManager.Builder().
setLogContext(logContext).
setTokenCache(tokenCache).
@ -1845,8 +1616,15 @@ public final class QuorumController implements Controller {
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.uncleanLeaderElectionCheckIntervalNs =
TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs);
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
registerMaybeFenceStaleBroker(sessionTimeoutNs);
if (leaderImbalanceCheckIntervalNs.isPresent()) {
registerElectPreferred(leaderImbalanceCheckIntervalNs.getAsLong());
}
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId,
@ -1855,6 +1633,100 @@ public final class QuorumController implements Controller {
this.raftClient.register(metaLogListener);
}
/**
* Register the writeNoOpRecord task.
*
* This task periodically writes a NoOpRecord to the metadata log, if the MetadataVersion
* supports it.
*
* @param maxIdleIntervalNs The period at which to write the NoOpRecord.
*/
private void registerWriteNoOpRecord(long maxIdleIntervalNs) {
periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
() -> {
ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
if (featureControl.metadataVersion().isNoOpRecordSupported()) {
records.add(new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
}
return ControllerResult.of(records, false);
},
maxIdleIntervalNs,
EnumSet.noneOf(PeriodicTaskFlag.class)));
}
/**
* Calculate what the period should be for the maybeFenceStaleBroker task.
*
* We sample 8 times per broker timeout period, so we'll generally fence a broker in no more
* than 112.5% of the given broker session timeout.
*
* @param sessionTimeoutNs The configured broker session timeout period in nanoseconds.
*
* @return The period for the maybeFenceStaleBroker task in nanoseconds.
*/
static long maybeFenceStaleBrokerPeriodNs(long sessionTimeoutNs) {
return Math.max(TimeUnit.MILLISECONDS.toNanos(1), sessionTimeoutNs / 8);
}
/**
* Register the maybeFenceStaleBroker task.
*
* This task periodically checks to see if there is a stale broker that needs to
* be fenced. It will only ever remove one stale broker at a time.
*
* @param sessionTimeoutNs The broker session timeout in nanoseconds.
*/
private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) {
periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker",
replicationControl::maybeFenceOneStaleBroker,
maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs),
EnumSet.noneOf(PeriodicTaskFlag.class)));
}
/**
* Register the electPreferred task.
*
* This task periodically checks to see if partitions with leaders other
* than the preferred leader can be switched to have the preferred leader.
*
* @param checkIntervalNs The check interval in nanoseconds.
*/
private void registerElectPreferred(long checkIntervalNs) {
periodicControl.registerTask(new PeriodicTask("electPreferred",
replicationControl::maybeBalancePartitionLeaders,
checkIntervalNs,
EnumSet.of(PeriodicTaskFlag.VERBOSE)));
}
/**
* Register the electUnclean task.
*
* This task periodically checks to see if partitions with no leaders can be
* have a new leader elected uncleanly.
*
* @param checkIntervalNs The check interval in nanoseconds.
*/
private void registerElectUnclean(long checkIntervalNs) {
periodicControl.registerTask(new PeriodicTask("electUnclean",
replicationControl::maybeElectUncleanLeaders,
checkIntervalNs,
EnumSet.of(PeriodicTaskFlag.VERBOSE)));
}
/**
* Register the delegation token expiration task.
*
* This task periodically expires delegation tokens.
*
* @param checkIntervalNs
*/
private void registerExpireDelegationTokens(long checkIntervalNs) {
periodicControl.registerTask(new PeriodicTask("expireDelegationTokens",
delegationTokenControlManager::sweepExpiredDelegationTokens,
checkIntervalNs,
EnumSet.of(PeriodicTaskFlag.VERBOSE)));
}
@Override
public CompletableFuture<AlterPartitionResponseData> alterPartition(
ControllerRequestContext context,
@ -2070,6 +1942,17 @@ public final class QuorumController implements Controller {
ControllerRequestContext context,
BrokerHeartbeatRequestData request
) {
// We start by updating the broker heartbeat in a lockless data structure.
// We do this first so that if the main controller thread is backlogged, the
// last contact time update still gets through.
if (!clusterControl.trackBrokerHeartbeat(request.brokerId(), request.brokerEpoch())) {
// Normally, ControllerWriteOperation would automatically check if the controller is
// active. But since we're doing this outside of the main controller thread, we have to
// do our own check here, and handle the case where we are inactive.
throw ControllerExceptions.newWrongControllerException(latestController());
}
// The next part takes place in the main controller thread and may involve generating
// metadata records.
return appendWriteEvent("processBrokerHeartbeat", context.deadlineNs(),
new ControllerWriteOperation<BrokerHeartbeatReply>() {
private final int brokerId = request.brokerId();
@ -2090,7 +1973,6 @@ public final class QuorumController implements Controller {
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
inControlledShutdown = result.response().inControlledShutdown();
rescheduleMaybeFenceStaleBrokers();
return result;
}
@ -2123,7 +2005,6 @@ public final class QuorumController implements Controller {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
},
EnumSet.noneOf(ControllerOperationFlag.class));

View File

@ -1597,18 +1597,34 @@ public class ReplicationControlManager {
return ControllerResult.of(records, null);
}
ControllerResult<Void> maybeFenceOneStaleBroker() {
List<ApiMessageAndVersion> records = new ArrayList<>();
ControllerResult<Boolean> maybeFenceOneStaleBroker() {
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is visible
// to the system prior to processing the next one
log.info("Fencing broker {} because its session has timed out.", brokerId);
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
});
return ControllerResult.of(records, null);
Optional<BrokerIdAndEpoch> idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired();
if (!idAndEpoch.isPresent()) {
log.debug("No stale brokers found.");
return ControllerResult.of(Collections.emptyList(), false);
}
int id = idAndEpoch.get().id();
long epoch = idAndEpoch.get().epoch();
if (!clusterControl.brokerRegistrations().containsKey(id)) {
log.info("Removing heartbeat tracker entry for unknown broker {} at epoch {}.",
id, epoch);
heartbeatManager.remove(id);
return ControllerResult.of(Collections.emptyList(), true);
} else if (clusterControl.brokerRegistrations().get(id).epoch() != epoch) {
log.info("Removing heartbeat tracker entry for broker {} at previous epoch {}. " +
"Current epoch is {}", id, epoch,
clusterControl.brokerRegistrations().get(id).epoch());
return ControllerResult.of(Collections.emptyList(), true);
}
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is visible
// to the system prior to processing the next one.
log.info("Fencing broker {} at epoch {} because its session has timed out.", id, epoch);
List<ApiMessageAndVersion> records = new ArrayList<>();
handleBrokerFenced(id, records);
heartbeatManager.fence(id);
return ControllerResult.of(records, true);
}
boolean arePartitionLeadersImbalanced() {

View File

@ -86,6 +86,9 @@ public final class EventHandlerExceptionInfo {
return new EventHandlerExceptionInfo(false, false, internal,
new PolicyViolationException("Unable to perform excessively large batch " +
"operation."));
} else if (internal instanceof PeriodicControlTaskException) {
// This exception is a periodic task which failed.
return new EventHandlerExceptionInfo(true, false, internal);
} else if (internal instanceof InterruptedException) {
// The controller event queue has been interrupted. This normally only happens during
// a JUnit test that has hung. The test framework sometimes sends an InterruptException

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.errors;
/**
* An exception indicating that a periodic task managed by PeriodicTaskControlManager failed.
*/
public class PeriodicControlTaskException extends RuntimeException {
public PeriodicControlTaskException(String message, Throwable e) {
super(message, e);
}
}

View File

@ -21,8 +21,6 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList;
import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator;
import org.apache.kafka.metadata.placement.UsableBroker;
@ -43,7 +41,6 @@ import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -60,109 +57,31 @@ public class BrokerHeartbeatManagerTest {
public void testHasValidSession() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
MockTime time = (MockTime) manager.time();
assertFalse(manager.hasValidSession(0));
assertFalse(manager.hasValidSession(0, 100L));
for (int brokerId = 0; brokerId < 3; brokerId++) {
manager.register(brokerId, true);
}
manager.tracker().updateContactTime(new BrokerIdAndEpoch(0, 100L));
manager.touch(0, false, 0);
time.sleep(5);
manager.tracker().updateContactTime(new BrokerIdAndEpoch(1, 100L));
manager.touch(1, false, 0);
manager.tracker().updateContactTime(new BrokerIdAndEpoch(2, 200L));
manager.touch(2, false, 0);
assertTrue(manager.hasValidSession(0));
assertTrue(manager.hasValidSession(1));
assertTrue(manager.hasValidSession(2));
assertFalse(manager.hasValidSession(3));
time.sleep(6);
assertFalse(manager.hasValidSession(0));
assertTrue(manager.hasValidSession(1));
assertTrue(manager.hasValidSession(2));
assertFalse(manager.hasValidSession(3));
manager.remove(2);
assertFalse(manager.hasValidSession(2));
manager.remove(1);
assertFalse(manager.hasValidSession(1));
}
@Test
public void testFindOneStaleBroker() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
MockTime time = (MockTime) manager.time();
assertFalse(manager.hasValidSession(0));
for (int brokerId = 0; brokerId < 3; brokerId++) {
manager.register(brokerId, true);
}
manager.touch(0, false, 0);
time.sleep(5);
manager.touch(1, false, 0);
time.sleep(1);
manager.touch(2, false, 0);
Iterator<BrokerHeartbeatState> iter = manager.unfenced().iterator();
assertEquals(0, iter.next().id());
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());
assertEquals(Optional.empty(), manager.findOneStaleBroker());
time.sleep(5);
assertEquals(Optional.of(0), manager.findOneStaleBroker());
manager.fence(0);
assertEquals(Optional.empty(), manager.findOneStaleBroker());
iter = manager.unfenced().iterator();
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());
time.sleep(20);
assertEquals(Optional.of(1), manager.findOneStaleBroker());
manager.fence(1);
assertEquals(Optional.of(2), manager.findOneStaleBroker());
manager.fence(2);
assertEquals(Optional.empty(), manager.findOneStaleBroker());
iter = manager.unfenced().iterator();
assertFalse(iter.hasNext());
}
@Test
public void testNextCheckTimeNs() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
MockTime time = (MockTime) manager.time();
assertEquals(Long.MAX_VALUE, manager.nextCheckTimeNs());
for (int brokerId = 0; brokerId < 4; brokerId++) {
manager.register(brokerId, true);
}
manager.touch(0, false, 0);
time.sleep(2);
manager.touch(1, false, 0);
time.sleep(1);
manager.touch(2, false, 0);
time.sleep(1);
manager.touch(3, false, 0);
assertEquals(Optional.empty(), manager.findOneStaleBroker());
assertEquals(10_000_000, manager.nextCheckTimeNs());
time.sleep(7);
assertEquals(10_000_000, manager.nextCheckTimeNs());
assertEquals(Optional.of(0), manager.findOneStaleBroker());
manager.fence(0);
assertEquals(12_000_000, manager.nextCheckTimeNs());
time.sleep(3);
assertEquals(Optional.of(1), manager.findOneStaleBroker());
manager.fence(1);
assertEquals(Optional.of(2), manager.findOneStaleBroker());
manager.fence(2);
assertEquals(14_000_000, manager.nextCheckTimeNs());
assertTrue(manager.hasValidSession(0, 100L));
assertFalse(manager.hasValidSession(0, 200L));
assertTrue(manager.hasValidSession(1, 100L));
assertTrue(manager.hasValidSession(2, 200L));
assertFalse(manager.hasValidSession(3, 300L));
}
@Test
public void testMetadataOffsetComparator() {
TreeSet<BrokerHeartbeatState> set =
new TreeSet<>(BrokerHeartbeatManager.MetadataOffsetComparator.INSTANCE);
BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1);
BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2);
BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3);
BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1, false, -1L, -1L);
BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2, false, -1L, -1L);
BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3, false, -1L, -1L);
set.add(broker1);
set.add(broker2);
set.add(broker3);
@ -175,9 +94,9 @@ public class BrokerHeartbeatManagerTest {
assertTrue(set.remove(broker2));
assertTrue(set.remove(broker3));
assertTrue(set.isEmpty());
broker1.metadataOffset = 800;
broker2.metadataOffset = 400;
broker3.metadataOffset = 100;
broker1.setMetadataOffset(800);
broker2.setMetadataOffset(400);
broker3.setMetadataOffset(100);
set.add(broker1);
set.add(broker2);
set.add(broker3);
@ -254,39 +173,6 @@ public class BrokerHeartbeatManagerTest {
assertEquals(OptionalLong.of(101), manager.controlledShutdownOffset(3));
}
@Test
public void testBrokerHeartbeatStateList() {
BrokerHeartbeatStateList list = new BrokerHeartbeatStateList();
assertNull(list.first());
BrokerHeartbeatStateIterator iterator = list.iterator();
assertFalse(iterator.hasNext());
BrokerHeartbeatState broker0 = new BrokerHeartbeatState(0);
broker0.lastContactNs = 200;
BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1);
broker1.lastContactNs = 100;
BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2);
broker2.lastContactNs = 50;
BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3);
broker3.lastContactNs = 150;
list.add(broker0);
list.add(broker1);
list.add(broker2);
list.add(broker3);
assertEquals(broker2, list.first());
iterator = list.iterator();
assertEquals(broker2, iterator.next());
assertEquals(broker1, iterator.next());
assertEquals(broker3, iterator.next());
assertEquals(broker0, iterator.next());
assertFalse(iterator.hasNext());
list.remove(broker1);
iterator = list.iterator();
assertEquals(broker2, iterator.next());
assertEquals(broker3, iterator.next());
assertEquals(broker0, iterator.next());
assertFalse(iterator.hasNext());
}
@Test
public void testCalculateNextBrokerState() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
public class BrokerHeartbeatTrackerTest {
private static BrokerHeartbeatTracker newBrokerHeartbeatTracker() {
MockTime time = new MockTime(0, 1_000_000, 0);
return new BrokerHeartbeatTracker(time, 10_000_000);
}
private static final Set<BrokerIdAndEpoch> TEST_BROKERS;
static {
Set<BrokerIdAndEpoch> brokers = new HashSet<>();
Arrays.asList(
new BrokerIdAndEpoch(0, 0L),
new BrokerIdAndEpoch(1, 100L),
new BrokerIdAndEpoch(2, 200L)
).forEach(brokers::add);
TEST_BROKERS = Collections.unmodifiableSet(brokers);
}
@Test
public void testUpdateContactTime() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertEquals(OptionalLong.empty(), tracker.contactTime(new BrokerIdAndEpoch(1, 100L)));
tracker.updateContactTime(new BrokerIdAndEpoch(1, 100L));
assertEquals(OptionalLong.of(0L), tracker.contactTime(new BrokerIdAndEpoch(1, 100L)));
}
@Test
public void testMaybeRemoveExpiredWithEmptyTracker() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertEquals(Optional.empty(), tracker.maybeRemoveExpired());
}
@Test
public void testMaybeRemoveExpiredWithAllUpToDate() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
TEST_BROKERS.forEach(tracker::updateContactTime);
assertEquals(Optional.empty(), tracker.maybeRemoveExpired());
}
@Test
public void testMaybeRemoveExpiredWithAllExpired() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
TEST_BROKERS.forEach(tracker::updateContactTime);
tracker.time().sleep(11);
Set<BrokerIdAndEpoch> expired = new HashSet<>();
Optional<BrokerIdAndEpoch> idAndEpoch;
do {
idAndEpoch = tracker.maybeRemoveExpired();
idAndEpoch.ifPresent(expired::add);
} while (idAndEpoch.isPresent());
assertEquals(TEST_BROKERS, expired);
}
@Test
public void testHasValidSessionIsTrueForKnownBroker() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
TEST_BROKERS.forEach(tracker::updateContactTime);
assertTrue(tracker.hasValidSession(new BrokerIdAndEpoch(2, 200L)));
}
@Test
public void testHasValidSessionIsFalseForUnknownBroker() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
TEST_BROKERS.forEach(tracker::updateContactTime);
assertFalse(tracker.hasValidSession(new BrokerIdAndEpoch(3, 300L)));
}
@Test
public void testHasValidSessionIsFalseForUnknownBrokerEpoch() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
TEST_BROKERS.forEach(tracker::updateContactTime);
assertFalse(tracker.hasValidSession(new BrokerIdAndEpoch(2, 100L)));
}
@Test
public void testIsExpiredIsFalseForTheCurrentTime() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertFalse(tracker.isExpired(456, 456));
}
@Test
public void testIsExpiredIsFalseForTenNanosecondsAfter() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertFalse(tracker.isExpired(456, 466));
}
@Test
public void testIsExpiredIsTrueAfterExpirationTime() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertTrue(tracker.isExpired(456, 456 + 10_000_001));
}
@Test
public void testIsExpiredIsFalseForPreviousTime() {
BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
assertFalse(tracker.isExpired(456, 0));
}
}

View File

@ -0,0 +1,300 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(10)
public class PeriodicTaskControlManagerTest {
static class FakePeriodicTask {
final AtomicInteger numCalls;
final AtomicBoolean continuation = new AtomicBoolean(false);
final PeriodicTask task;
final AtomicBoolean shouldFail = new AtomicBoolean(false);
FakePeriodicTask(
String name,
long periodNs
) {
this.numCalls = new AtomicInteger();
this.task = new PeriodicTask(name,
() -> {
numCalls.addAndGet(1);
if (shouldFail.getAndSet(false)) {
throw new NullPointerException("uh oh");
}
return ControllerResult.of(Collections.emptyList(),
continuation.getAndSet(false));
},
periodNs,
EnumSet.noneOf(PeriodicTaskFlag.class));
}
}
static class TrackedTask {
final String tag;
final long deadlineNs;
final Supplier<ControllerResult<Void>> op;
TrackedTask(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
) {
this.tag = tag;
this.deadlineNs = deadlineNs;
this.op = op;
}
}
static class PeriodicTaskControlManagerTestEnv implements PeriodicTaskControlManager.QueueAccessor {
final MockTime time;
final PeriodicTaskControlManager manager;
final TreeMap<Long, List<TrackedTask>> tasks;
int numCalls = 10_000;
PeriodicTaskControlManagerTestEnv() {
this.time = new MockTime(0, 0, 0);
this.manager = new PeriodicTaskControlManager.Builder().
setTime(time).
setQueueAccessor(this).
build();
this.tasks = new TreeMap<>();
}
@Override
public void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
) {
if (numCalls <= 0) {
throw new RuntimeException("too many deferred calls.");
}
numCalls--;
cancelDeferred(tag);
TrackedTask task = new TrackedTask(tag, deadlineNs, op);
tasks.computeIfAbsent(deadlineNs, __ -> new ArrayList<>()).add(task);
}
@Override
public void cancelDeferred(String tag) {
Iterator<Map.Entry<Long, List<TrackedTask>>> iter = tasks.entrySet().iterator();
boolean foundTask = false;
while (iter.hasNext() && (!foundTask)) {
Map.Entry<Long, List<TrackedTask>> entry = iter.next();
Iterator<TrackedTask> taskIter = entry.getValue().iterator();
while (taskIter.hasNext()) {
TrackedTask task = taskIter.next();
if (task.tag.equals(tag)) {
taskIter.remove();
foundTask = true;
break;
}
}
if (entry.getValue().isEmpty()) {
iter.remove();
}
}
}
int numDeferred() {
int count = 0;
for (List<TrackedTask> taskList : tasks.values()) {
count += taskList.size();
}
return count;
}
void advanceTime(long ms) {
time.sleep(ms);
while (true) {
Iterator<Map.Entry<Long, List<TrackedTask>>> iter = tasks.entrySet().iterator();
if (!iter.hasNext()) {
return;
}
Map.Entry<Long, List<TrackedTask>> entry = iter.next();
if (time.nanoseconds() < entry.getKey()) {
return;
}
if (!entry.getValue().isEmpty()) {
Iterator<TrackedTask> taskIter = entry.getValue().iterator();
TrackedTask task = taskIter.next();
taskIter.remove();
try {
task.op.get();
} catch (Exception e) {
// discard exception
}
continue;
}
iter.remove();
}
}
}
@Test
public void testActivate() {
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
assertFalse(env.manager.active());
env.manager.activate();
assertTrue(env.manager.active());
assertEquals(0, env.numDeferred());
}
@Test
public void testDeactivate() {
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
assertFalse(env.manager.active());
env.manager.activate();
env.manager.deactivate();
assertFalse(env.manager.active());
assertEquals(0, env.numDeferred());
}
@Test
public void testRegisterTaskWhenDeactivated() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.registerTask(foo.task);
assertEquals(0, env.numDeferred());
}
@Test
public void testRegisterTaskWhenActivated() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
assertEquals(1, env.numDeferred());
}
@Test
public void testRegisterTaskWhenActivatedThenDeactivate() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
env.manager.deactivate();
assertEquals(0, env.numDeferred());
}
@Test
public void testRegisterTaskAndAdvanceTime() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
FakePeriodicTask bar = new FakePeriodicTask("bar", MILLISECONDS.toNanos(50));
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
env.manager.registerTask(bar.task);
assertEquals(2, env.numDeferred());
env.advanceTime(50);
assertEquals(0, foo.numCalls.get());
assertEquals(1, bar.numCalls.get());
assertEquals(2, env.numDeferred());
env.advanceTime(50);
assertEquals(1, foo.numCalls.get());
assertEquals(2, bar.numCalls.get());
assertEquals(2, env.numDeferred());
env.manager.deactivate();
}
@Test
public void testContinuation() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
FakePeriodicTask bar = new FakePeriodicTask("bar", MILLISECONDS.toNanos(50));
bar.continuation.set(true);
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
env.manager.registerTask(bar.task);
assertEquals(2, env.numDeferred());
env.advanceTime(50);
assertEquals(0, foo.numCalls.get());
assertEquals(1, bar.numCalls.get());
assertEquals(2, env.numDeferred());
env.advanceTime(10);
assertEquals(2, bar.numCalls.get());
env.advanceTime(40);
assertEquals(1, foo.numCalls.get());
assertEquals(2, bar.numCalls.get());
assertEquals(2, env.numDeferred());
env.advanceTime(10);
assertEquals(3, bar.numCalls.get());
env.manager.deactivate();
}
@Test
public void testRegisterTaskAndUnregister() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
FakePeriodicTask bar = new FakePeriodicTask("bar", MILLISECONDS.toNanos(50));
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
env.manager.registerTask(bar.task);
assertEquals(2, env.numDeferred());
env.advanceTime(50);
assertEquals(0, foo.numCalls.get());
assertEquals(1, bar.numCalls.get());
env.manager.unregisterTask(foo.task.name());
assertEquals(1, env.numDeferred());
env.manager.unregisterTask(bar.task.name());
assertEquals(0, env.numDeferred());
env.advanceTime(200);
assertEquals(0, foo.numCalls.get());
assertEquals(1, bar.numCalls.get());
env.manager.deactivate();
}
@Test
public void testReschedulingAfterFailure() {
FakePeriodicTask foo = new FakePeriodicTask("foo", MILLISECONDS.toNanos(100));
foo.shouldFail.set(true);
PeriodicTaskControlManagerTestEnv env = new PeriodicTaskControlManagerTestEnv();
env.manager.activate();
env.manager.registerTask(foo.task);
assertEquals(1, env.numDeferred());
env.advanceTime(100);
assertEquals(1, foo.numCalls.get());
env.advanceTime(300000);
assertEquals(2, foo.numCalls.get());
env.manager.deactivate();
}
}

View File

@ -622,8 +622,8 @@ public class QuorumControllerTest {
@Test
public void testNoOpRecordWriteAfterTimeout() throws Throwable {
long maxIdleIntervalNs = 1_000;
long maxReplicationDelayMs = 60_000;
long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100);
long maxReplicationDelayMs = 1_000;
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
build();

View File

@ -448,6 +448,7 @@ public class ReplicationControlManagerTest {
void unfenceBrokers(Integer... brokerIds) {
for (int brokerId : brokerIds) {
clusterControl.trackBrokerHeartbeat(brokerId, defaultBrokerEpoch(brokerId));
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setBrokerId(brokerId).setBrokerEpoch(defaultBrokerEpoch(brokerId)).
@ -494,12 +495,11 @@ public class ReplicationControlManagerTest {
.collect(Collectors.toSet());
unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
Optional<Integer> staleBroker = clusterControl.heartbeatManager().findOneStaleBroker();
while (staleBroker.isPresent()) {
ControllerResult<Void> fenceResult = replicationControl.maybeFenceOneStaleBroker();
ControllerResult<Boolean> fenceResult;
do {
fenceResult = replicationControl.maybeFenceOneStaleBroker();
replay(fenceResult.records());
staleBroker = clusterControl.heartbeatManager().findOneStaleBroker();
}
} while (fenceResult.response().booleanValue());
assertEquals(brokerIds, clusterControl.fencedBrokerIds());
}
@ -2844,8 +2844,7 @@ public class ReplicationControlManagerTest {
filter(broker -> broker.id() == 0).findFirst();
assertTrue(state.isPresent());
assertEquals(0, state.get().id());
assertEquals(100000000L, state.get().lastContactNs);
assertEquals(123, state.get().metadataOffset);
assertEquals(123, state.get().metadataOffset());
}
@Test

View File

@ -19,10 +19,12 @@ package org.apache.kafka.controller.errors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -44,6 +46,14 @@ public class EventHandlerExceptionInfoTest {
private static final EventHandlerExceptionInfo REJECTED_EXECUTION =
EventHandlerExceptionInfo.fromInternal(new RejectedExecutionException(), OptionalInt::empty);
private static final EventHandlerExceptionInfo BOUNDED_LIST_TOO_LONG =
EventHandlerExceptionInfo.fromInternal(new BoundedListTooLongException("too long"), OptionalInt::empty);
private static final EventHandlerExceptionInfo PERIODIC_FAILURE =
EventHandlerExceptionInfo.fromInternal(
new PeriodicControlTaskException("foo: task failed: null pointer.",
new NullPointerException()), OptionalInt::empty);
private static final EventHandlerExceptionInfo INTERRUPTED =
EventHandlerExceptionInfo.fromInternal(
new InterruptedException(),
@ -87,6 +97,34 @@ public class EventHandlerExceptionInfoTest {
REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true, 456L));
}
@Test
public void testBoundedListTooLongExceptionInfo() {
assertEquals(new EventHandlerExceptionInfo(false, false,
new BoundedListTooLongException("too long"),
new PolicyViolationException("Unable to perform excessively large batch operation.")),
BOUNDED_LIST_TOO_LONG);
}
@Test
public void testBoundedListTooLongExceptionFailureMessage() {
assertEquals("event failed with BoundedListTooLongException (treated as PolicyViolationException) " +
"in 234 microseconds. Exception message: too long",
BOUNDED_LIST_TOO_LONG.failureMessage(123, OptionalLong.of(234L), true, 456L));
}
@Test
public void testPeriodicControlTaskExceptionInfo() {
assertEquals(new EventHandlerExceptionInfo(true, false,
new PeriodicControlTaskException("foo: task failed: null pointer.", new NullPointerException())),
PERIODIC_FAILURE);
}
@Test
public void testPeriodicControlTaskExceptionFailureMessage() {
assertEquals("event failed with PeriodicControlTaskException in 234 microseconds.",
PERIODIC_FAILURE.failureMessage(123, OptionalLong.of(234L), true, 456L));
}
@Test
public void testInterruptedExceptionInfo() {
assertEquals(new EventHandlerExceptionInfo(true, true,
@ -164,6 +202,8 @@ public class EventHandlerExceptionInfoTest {
public void testIsNotTimeoutException() {
assertFalse(TOPIC_EXISTS.isTimeoutException());
assertFalse(REJECTED_EXECUTION.isTimeoutException());
assertFalse(BOUNDED_LIST_TOO_LONG.isTimeoutException());
assertFalse(PERIODIC_FAILURE.isTimeoutException());
assertFalse(INTERRUPTED.isTimeoutException());
assertFalse(NULL_POINTER.isTimeoutException());
assertFalse(NOT_LEADER.isTimeoutException());