KAFKA-14292; Fix KRaft controlled shutdown delay (#12736)

The `controlledShutDownOffset` is defined as the "offset at which the broker should complete its controlled shutdown, or -1 if the broker is not performing a controlled shutdown". The controller sets this offset to a non-negative integer on receiving a heartbeat from a broker that's in controlled shutdown state. Currently, this offset is being updated and bumped every single time a broker in controlled shutdown mode send a heartbeat, delaying when controlled shutdown can actually complete for the broker. We should only update the offset when it was previously set to -1 to allow controlled shutdown to complete.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Alyssa Huang 2022-10-13 13:29:45 -07:00 committed by GitHub
parent 484f85ff53
commit 0cb1d61413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 19 deletions

View File

@ -17,6 +17,7 @@
package org.apache.kafka.controller;
import java.util.OptionalLong;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -42,8 +43,9 @@ import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
/**
* The BrokerHeartbeatManager manages all the soft state associated with broker heartbeats.
* Soft state is state which does not appear in the metadata log. This state includes
* things like the last time each broker sent us a heartbeat, and whether the broker is
* trying to perform a controlled shutdown.
* things like the last time each broker sent us a heartbeat. As of KIP-841, the controlled
* shutdown state is no longer treated as soft state and is persisted to the metadata log on broker
* controlled shutdown requests.
*
* Only the active controller has a BrokerHeartbeatManager, since only the active
* controller handles broker heartbeats. Standby controllers will create a heartbeat
@ -77,7 +79,7 @@ public class BrokerHeartbeatManager {
* if the broker is not performing a controlled shutdown. When this field is
* updated, we also have to update the broker's position in the shuttingDown set.
*/
private long controlledShutDownOffset;
private long controlledShutdownOffset;
/**
* The previous entry in the unfenced list, or null if the broker is not in that list.
@ -95,7 +97,7 @@ public class BrokerHeartbeatManager {
this.prev = null;
this.next = null;
this.metadataOffset = -1;
this.controlledShutDownOffset = -1;
this.controlledShutdownOffset = -1;
}
/**
@ -116,7 +118,7 @@ public class BrokerHeartbeatManager {
* Returns true only if the broker is in controlled shutdown state.
*/
boolean shuttingDown() {
return controlledShutDownOffset >= 0;
return controlledShutdownOffset >= 0;
}
}
@ -275,6 +277,16 @@ public class BrokerHeartbeatManager {
return brokers.values();
}
// VisibleForTesting
OptionalLong controlledShutdownOffset(int brokerId) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null || broker.controlledShutdownOffset == -1) {
return OptionalLong.empty();
}
return OptionalLong.of(broker.controlledShutdownOffset);
}
/**
* Mark a broker as fenced.
*
@ -381,7 +393,7 @@ public class BrokerHeartbeatManager {
if (fenced) {
// If a broker is fenced, it leaves controlled shutdown. On its next heartbeat,
// it will shut down immediately.
broker.controlledShutDownOffset = -1;
broker.controlledShutdownOffset = -1;
} else {
unfenced.add(broker);
if (!broker.shuttingDown()) {
@ -400,12 +412,13 @@ public class BrokerHeartbeatManager {
}
/**
* Mark a broker as being in the controlled shutdown state.
* Mark a broker as being in the controlled shutdown state. We only update the
* controlledShutdownOffset if the broker was previously not in controlled shutdown state.
*
* @param brokerId The broker id.
* @param controlledShutDownOffset The offset at which controlled shutdown will be complete.
*/
void updateControlledShutdownOffset(int brokerId, long controlledShutDownOffset) {
void maybeUpdateControlledShutdownOffset(int brokerId, long controlledShutDownOffset) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null) {
throw new RuntimeException("Unable to locate broker " + brokerId);
@ -414,9 +427,11 @@ public class BrokerHeartbeatManager {
throw new RuntimeException("Fenced brokers cannot enter controlled shutdown.");
}
active.remove(broker);
broker.controlledShutDownOffset = controlledShutDownOffset;
log.debug("Updated the controlled shutdown offset for broker {} to {}.",
brokerId, controlledShutDownOffset);
if (broker.controlledShutdownOffset < 0) {
broker.controlledShutdownOffset = controlledShutDownOffset;
log.debug("Updated the controlled shutdown offset for broker {} to {}.",
brokerId, controlledShutDownOffset);
}
}
/**
@ -581,17 +596,17 @@ public class BrokerHeartbeatManager {
return new BrokerControlStates(currentState, CONTROLLED_SHUTDOWN);
}
long lowestActiveOffset = lowestActiveOffset();
if (broker.controlledShutDownOffset <= lowestActiveOffset) {
if (broker.controlledShutdownOffset <= lowestActiveOffset) {
log.info("The request from broker {} to shut down has been granted " +
"since the lowest active offset {} is now greater than the " +
"broker's controlled shutdown offset {}.", brokerId,
lowestActiveOffset, broker.controlledShutDownOffset);
lowestActiveOffset, broker.controlledShutdownOffset);
return new BrokerControlStates(currentState, SHUTDOWN_NOW);
}
log.debug("The request from broker {} to shut down can not yet be granted " +
"because the lowest active offset {} is not greater than the broker's " +
"shutdown offset {}.", brokerId, lowestActiveOffset,
broker.controlledShutDownOffset);
broker.controlledShutdownOffset);
return new BrokerControlStates(currentState, CONTROLLED_SHUTDOWN);
default:

View File

@ -1995,7 +1995,7 @@ public final class QuorumController implements Controller {
public void processBatchEndOffset(long offset) {
if (inControlledShutdown) {
clusterControl.heartbeatManager().
updateControlledShutdownOffset(brokerId, offset);
maybeUpdateControlledShutdownOffset(brokerId, offset);
}
}
});

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeSet;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
@ -203,17 +204,38 @@ public class BrokerHeartbeatManagerTest {
expected.add(new UsableBroker(3, Optional.of("rack2"), false));
expected.add(new UsableBroker(4, Optional.of("rack1"), true));
assertEquals(expected, usableBrokersToSet(manager));
manager.updateControlledShutdownOffset(2, 0);
manager.maybeUpdateControlledShutdownOffset(2, 0);
assertEquals(100L, manager.lowestActiveOffset());
assertThrows(RuntimeException.class,
() -> manager.updateControlledShutdownOffset(4, 0));
() -> manager.maybeUpdateControlledShutdownOffset(4, 0));
manager.touch(4, false, 100);
manager.updateControlledShutdownOffset(4, 0);
manager.maybeUpdateControlledShutdownOffset(4, 0);
expected.remove(new UsableBroker(2, Optional.of("rack1"), false));
expected.remove(new UsableBroker(4, Optional.of("rack1"), true));
assertEquals(expected, usableBrokersToSet(manager));
}
@Test
public void testControlledShutdownOffsetIsOnlyUpdatedOnce() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
assertEquals(Collections.emptySet(), usableBrokersToSet(manager));
manager.touch(0, false, 100);
manager.touch(1, false, 100);
manager.touch(2, false, 98);
manager.touch(3, false, 100);
manager.touch(4, true, 100);
assertEquals(OptionalLong.empty(), manager.controlledShutdownOffset(2));
manager.maybeUpdateControlledShutdownOffset(2, 98);
assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2));
manager.maybeUpdateControlledShutdownOffset(2, 99);
assertEquals(OptionalLong.of(98), manager.controlledShutdownOffset(2));
assertEquals(OptionalLong.empty(), manager.controlledShutdownOffset(3));
manager.maybeUpdateControlledShutdownOffset(3, 101);
assertEquals(OptionalLong.of(101), manager.controlledShutdownOffset(3));
manager.maybeUpdateControlledShutdownOffset(3, 102);
assertEquals(OptionalLong.of(101), manager.controlledShutdownOffset(3));
}
@Test
public void testBrokerHeartbeatStateList() {
BrokerHeartbeatStateList list = new BrokerHeartbeatStateList();
@ -256,7 +278,7 @@ public class BrokerHeartbeatManagerTest {
manager.touch(3, false, 100);
manager.touch(4, true, 100);
manager.touch(5, false, 99);
manager.updateControlledShutdownOffset(5, 99);
manager.maybeUpdateControlledShutdownOffset(5, 99);
assertEquals(98L, manager.lowestActiveOffset());