mirror of https://github.com/apache/kafka.git
KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state. Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
24cb2d2f95
commit
98a3dcb477
|
|
@ -209,7 +209,6 @@ class ControllerServer(
|
|||
setConfigurationValidator(new ControllerConfigurationValidator()).
|
||||
setStaticConfig(config.originals).
|
||||
setBootstrapMetadata(bootstrapMetadata).
|
||||
setMetadataFaultHandler(metadataFaultHandler).
|
||||
setFatalFaultHandler(fatalFaultHandler)
|
||||
}
|
||||
authorizer match {
|
||||
|
|
|
|||
|
|
@ -323,7 +323,7 @@ abstract class QuorumTestHarness extends Logging {
|
|||
raftApiVersions = raftManager.apiVersions,
|
||||
bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "test harness"),
|
||||
metadataFaultHandler = faultHandler,
|
||||
fatalFaultHandler = faultHandler,
|
||||
fatalFaultHandler = faultHandler
|
||||
)
|
||||
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
|
||||
if (e != null) {
|
||||
|
|
|
|||
|
|
@ -158,7 +158,6 @@ public final class QuorumController implements Controller {
|
|||
private final int nodeId;
|
||||
private final String clusterId;
|
||||
private FaultHandler fatalFaultHandler = null;
|
||||
private FaultHandler metadataFaultHandler = null;
|
||||
private Time time = Time.SYSTEM;
|
||||
private String threadNamePrefix = null;
|
||||
private LogContext logContext = null;
|
||||
|
|
@ -191,11 +190,6 @@ public final class QuorumController implements Controller {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
|
||||
this.metadataFaultHandler = metadataFaultHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int nodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
|
@ -315,8 +309,6 @@ public final class QuorumController implements Controller {
|
|||
throw new IllegalStateException("You must specify the quorum features");
|
||||
} else if (fatalFaultHandler == null) {
|
||||
throw new IllegalStateException("You must specify a fatal fault handler.");
|
||||
} else if (metadataFaultHandler == null) {
|
||||
throw new IllegalStateException("You must specify a metadata fault handler.");
|
||||
}
|
||||
|
||||
if (threadNamePrefix == null) {
|
||||
|
|
@ -335,7 +327,6 @@ public final class QuorumController implements Controller {
|
|||
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
|
||||
return new QuorumController(
|
||||
fatalFaultHandler,
|
||||
metadataFaultHandler,
|
||||
logContext,
|
||||
nodeId,
|
||||
clusterId,
|
||||
|
|
@ -984,7 +975,7 @@ public final class QuorumController implements Controller {
|
|||
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
|
||||
message.message().getClass().getSimpleName(), i, messages.size(),
|
||||
batch.baseOffset());
|
||||
throw metadataFaultHandler.handleFault(failureMessage, e);
|
||||
throw fatalFaultHandler.handleFault(failureMessage, e);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
|
@ -1040,7 +1031,7 @@ public final class QuorumController implements Controller {
|
|||
"%d record(s) in the batch with baseOffset %d.",
|
||||
message.message().getClass().getSimpleName(), reader.snapshotId(),
|
||||
i, messages.size(), batch.baseOffset());
|
||||
throw metadataFaultHandler.handleFault(failureMessage, e);
|
||||
throw fatalFaultHandler.handleFault(failureMessage, e);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
|
@ -1490,11 +1481,6 @@ public final class QuorumController implements Controller {
|
|||
*/
|
||||
private final FaultHandler fatalFaultHandler;
|
||||
|
||||
/**
|
||||
* Handles faults in metadata handling that are normally not fatal.
|
||||
*/
|
||||
private final FaultHandler metadataFaultHandler;
|
||||
|
||||
/**
|
||||
* The slf4j log context, used to create new loggers.
|
||||
*/
|
||||
|
|
@ -1703,7 +1689,6 @@ public final class QuorumController implements Controller {
|
|||
|
||||
private QuorumController(
|
||||
FaultHandler fatalFaultHandler,
|
||||
FaultHandler metadataFaultHandler,
|
||||
LogContext logContext,
|
||||
int nodeId,
|
||||
String clusterId,
|
||||
|
|
@ -1729,7 +1714,6 @@ public final class QuorumController implements Controller {
|
|||
int maxRecordsPerBatch
|
||||
) {
|
||||
this.fatalFaultHandler = fatalFaultHandler;
|
||||
this.metadataFaultHandler = metadataFaultHandler;
|
||||
this.logContext = logContext;
|
||||
this.log = logContext.logger(QuorumController.class);
|
||||
this.nodeId = nodeId;
|
||||
|
|
|
|||
|
|
@ -1315,10 +1315,51 @@ public class QuorumControllerTest {
|
|||
setValue(null), (short) 0)), null);
|
||||
});
|
||||
assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertEquals(NullPointerException.class,
|
||||
controlEnv.fatalFaultHandler().firstException().getCause().getClass());
|
||||
controlEnv.fatalFaultHandler().setIgnore(true);
|
||||
controlEnv.metadataFaultHandler().setIgnore(true);
|
||||
assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId())
|
||||
.firstException().getCause().getClass());
|
||||
controlEnv.ignoreFatalFaults();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
|
||||
InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
|
||||
);
|
||||
|
||||
LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3)
|
||||
.setSnapshotReader(FileRawSnapshotReader.open(
|
||||
invalidSnapshot.tempDir.toPath(),
|
||||
new OffsetAndEpoch(0, 0)
|
||||
));
|
||||
|
||||
try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
|
||||
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
|
||||
TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
|
||||
return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
|
||||
}),
|
||||
"At least one controller failed to detect the fatal fault"
|
||||
);
|
||||
controlEnv.ignoreFatalFaults();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFatalMetadataErrorDuringLogLoading() throws Exception {
|
||||
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
|
||||
logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
|
||||
));
|
||||
|
||||
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
|
||||
TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
|
||||
return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
|
||||
}),
|
||||
"At least one controller failed to detect the fatal fault"
|
||||
);
|
||||
controlEnv.ignoreFatalFaults();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,9 @@ import org.apache.kafka.server.fault.MockFaultHandler;
|
|||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -40,8 +42,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
|||
public class QuorumControllerTestEnv implements AutoCloseable {
|
||||
private final List<QuorumController> controllers;
|
||||
private final LocalLogManagerTestEnv logEnv;
|
||||
private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
|
||||
private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
|
||||
private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>();
|
||||
|
||||
public static class Builder {
|
||||
private final LocalLogManagerTestEnv logEnv;
|
||||
|
|
@ -98,17 +99,18 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
try {
|
||||
ApiVersions apiVersions = new ApiVersions();
|
||||
List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
|
||||
for (int i = 0; i < numControllers; i++) {
|
||||
QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
|
||||
builder.setRaftClient(logEnv.logManagers().get(i));
|
||||
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
|
||||
QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
|
||||
builder.setRaftClient(logEnv.logManagers().get(nodeId));
|
||||
builder.setBootstrapMetadata(bootstrapMetadata);
|
||||
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
|
||||
builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
|
||||
builder.setQuorumFeatures(new QuorumFeatures(nodeId, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
|
||||
sessionTimeoutMillis.ifPresent(timeout -> {
|
||||
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
|
||||
});
|
||||
MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
|
||||
builder.setFatalFaultHandler(fatalFaultHandler);
|
||||
builder.setMetadataFaultHandler(metadataFaultHandler);
|
||||
fatalFaultHandlers.put(nodeId, fatalFaultHandler);
|
||||
controllerBuilderInitializer.accept(builder);
|
||||
this.controllers.add(builder.build());
|
||||
}
|
||||
|
|
@ -142,12 +144,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
return controllers;
|
||||
}
|
||||
|
||||
public MockFaultHandler fatalFaultHandler() {
|
||||
return fatalFaultHandler;
|
||||
public MockFaultHandler fatalFaultHandler(Integer nodeId) {
|
||||
return fatalFaultHandlers.get(nodeId);
|
||||
}
|
||||
|
||||
public MockFaultHandler metadataFaultHandler() {
|
||||
return metadataFaultHandler;
|
||||
public void ignoreFatalFaults() {
|
||||
for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
|
||||
faultHandler.setIgnore(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -158,7 +162,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
for (QuorumController controller : controllers) {
|
||||
controller.close();
|
||||
}
|
||||
fatalFaultHandler.maybeRethrowFirstException();
|
||||
metadataFaultHandler.maybeRethrowFirstException();
|
||||
for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
|
||||
faultHandler.maybeRethrowFirstException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue