KAFKA-13966 Prepend bootstrap metadata to controller queue (#12269)

Also fixes flaky QuorumControllerTest#testInvalidBootstrapMetadata

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Arthur 2022-06-23 11:29:21 -04:00 committed by GitHub
parent d00b7875e0
commit c6c9da02a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 10 deletions

View File

@ -782,6 +782,13 @@ public final class QuorumController implements Controller {
}
}
private <T> CompletableFuture<T> prependWriteEvent(String name,
ControllerWriteOperation<T> op) {
ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
queue.prepend(event);
return event.future();
}
private <T> CompletableFuture<T> appendWriteEvent(String name,
OptionalLong deadlineNs,
ControllerWriteOperation<T> op) {
@ -922,7 +929,6 @@ public final class QuorumController implements Controller {
curEpoch);
}
curClaimEpoch = newEpoch;
controllerMetrics.setActive(true);
updateWriteOffset(lastCommittedOffset);
@ -942,14 +948,20 @@ public final class QuorumController implements Controller {
"Got " + bootstrapMetadata.metadataVersion()));
} else {
metadataVersion = bootstrapMetadata.metadataVersion();
future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
// This call is here instead of inside the appendWriteEvent for testing purposes.
final List<ApiMessageAndVersion> bootstrapRecords = bootstrapMetadata.records();
// We prepend the bootstrap event in order to ensure the bootstrap metadata is written before
// any external controller write events are processed.
future = prependWriteEvent("bootstrapMetadata", () -> {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
} else {
log.info("Upgrading KRaft cluster and initializing metadata.version to {}",
metadataVersion.featureLevel());
}
return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
return ControllerResult.atomicOf(bootstrapRecords, null);
});
}
future.whenComplete((result, exception) -> {
@ -967,7 +979,6 @@ public final class QuorumController implements Controller {
metadataVersion = featureControl.metadataVersion();
}
log.info(
"Becoming the active controller at epoch {}, committed offset {}, committed epoch {}, and metadata.version {}",
newEpoch, lastCommittedOffset, lastCommittedEpoch, metadataVersion.featureLevel()

View File

@ -32,6 +32,7 @@ import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -536,11 +537,16 @@ public class QuorumControllerTest {
}
private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
}
private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
MetadataVersion minVersion, MetadataVersion maxVersion) {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
.setMinSupportedVersion(minVersion.featureLevel())
.setMaxSupportedVersion(maxVersion.featureLevel()));
return features;
}
@ -1184,18 +1190,86 @@ public class QuorumControllerTest {
@Test
public void testInvalidBootstrapMetadata() throws Exception {
// We can't actually create a BootstrapMetadata with an invalid version, so we have to fake it
// We can't actually create a BootstrapMetadata with an invalid version, so we have to mock it
BootstrapMetadata bootstrapMetadata = Mockito.mock(BootstrapMetadata.class);
Mockito.when(bootstrapMetadata.metadataVersion()).thenReturn(MetadataVersion.IBP_2_8_IV0);
CyclicBarrier barrier = new CyclicBarrier(2);
Mockito.when(bootstrapMetadata.metadataVersion()).thenAnswer(__ -> {
// This barrier allows us to catch the controller after it becomes leader, but before the bootstrapping fails
barrier.await(10, TimeUnit.SECONDS);
return MetadataVersion.IBP_2_8_IV0;
});
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.empty(), OptionalLong.empty(), bootstrapMetadata);
) {
QuorumController active = controlEnv.activeController();
TestUtils.waitForCondition(() -> !active.isActive(),
QuorumController controller = controlEnv.activeController();
assertTrue(controller.isActive());
// Unblock the first call to BootstrapMetadata#metadataVersion
barrier.await(10, TimeUnit.SECONDS);
// Unblock the second call to BootstrapMetadata#metadataVersion
barrier.await(10, TimeUnit.SECONDS);
TestUtils.waitForCondition(() -> !controller.isActive(),
"Timed out waiting for controller to renounce itself after bad bootstrap metadata version.");
}
}
@Test
public void testBootstrapMetadataStartupRace() throws Throwable {
// KAFKA-13966: This tests a race condition between external RPC calls being handled before the bootstrap
// metadata is written. We instrument this by forcing the BootstrapMetadata#records method to block until a
// latch has been completed. This allows an asynchronous broker registration call to be handled before the
// handleLeaderChange callback completes. In this case, the registration should fail because the bootstrap
// metadata includes an unsupported metadata.version.
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(MetadataVersion.latest());
BootstrapMetadata mockedMetadata = Mockito.mock(BootstrapMetadata.class);
CountDownLatch latch = new CountDownLatch(1);
Mockito.when(mockedMetadata.metadataVersion()).thenReturn(bootstrapMetadata.metadataVersion());
Mockito.when(mockedMetadata.records()).then(__ -> {
if (latch.await(30, TimeUnit.SECONDS)) {
return bootstrapMetadata.records();
} else {
throw new RuntimeException("Latch never completed");
}
});
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
}, OptionalLong.empty(), OptionalLong.empty(), mockedMetadata)) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").
setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
// Issue a register broker request concurrently as the controller is initializing
assertEquals(1, latch.getCount(), "Latch should not have been completed yet");
CompletableFuture<Void> registrationFuture = new CompletableFuture<>();
Thread registerThread = new Thread(() -> {
try {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV0, MetadataVersion.IBP_3_3_IV0)).
setListeners(listeners));
// Once we have the future, the register broker event has been enqueued
latch.countDown();
reply.get();
registrationFuture.complete(null);
} catch (Throwable t) {
registrationFuture.completeExceptionally(t);
}
});
registerThread.start();
registerThread.join(30_000);
assertTrue(registrationFuture.isCompletedExceptionally(),
"Should not be able to register broker since the bootstrap metadata specified an incompatible metadata.version");
assertEquals(0, active.clusterControl().brokerRegistrations().size());
}
}
}
}

View File

@ -52,6 +52,16 @@ public class QuorumControllerTestEnv implements AutoCloseable {
this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
}
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
Consumer<Builder> builderConsumer,
OptionalLong sessionTimeoutMillis,
OptionalLong leaderImbalanceCheckIntervalNs,
MetadataVersion metadataVersion
) throws Exception {
this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs, BootstrapMetadata.create(metadataVersion));
}
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
Consumer<Builder> builderConsumer,