KAFKA-15441 Allow broker heartbeats to complete in metadata transaction (#14351)

This patch allows broker heartbeat events to be completed while a metadata transaction is in-flight.

More generally, this patch allows any RUNS_IN_PREMIGRATION event to complete while the controller
is in pre-migration mode even if the migration transaction is in-flight.

We had a problem with broker heartbeats timing out because they could not be completed while a large
ZK migration transaction was in-flight. This resulted in the controller fencing all the ZK brokers which 
has many undesirable downstream effects. 

Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2023-09-08 16:36:13 -04:00 committed by GitHub
parent 84c49c6a09
commit b24ccd65b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 31 deletions

View File

@ -18,8 +18,8 @@ package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, KafkaConfig} import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.ClusterInstance import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, Type} import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
@ -55,6 +55,26 @@ import java.util.{Properties, UUID}
import scala.collection.Seq import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object ZkMigrationIntegrationTest {
def addZkBrokerProps(props: Properties): Unit = {
props.setProperty("inter.broker.listener.name", "EXTERNAL")
props.setProperty("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
props.setProperty("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
props.setProperty("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
}
def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = {
Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2).foreach { mv =>
val clusterConfig = ClusterConfig.defaultClusterBuilder()
.metadataVersion(mv)
.brokers(3)
.`type`(Type.ZK)
.build()
addZkBrokerProps(clusterConfig.serverProperties())
clusterGenerator.accept(clusterConfig)
}
}
}
@ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300) @Timeout(300)
@ -308,12 +328,7 @@ class ZkMigrationIntegrationTest {
} }
// SCRAM and Quota are intermixed. Test Quota Only here // SCRAM and Quota are intermixed. Test Quota Only here
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( @ClusterTemplate("zkClustersForAllMigrationVersions")
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
))
def testDualWrite(zkCluster: ClusterInstance): Unit = { def testDualWrite(zkCluster: ClusterInstance): Unit = {
// Create a topic in ZK mode // Create a topic in ZK mode
var admin = zkCluster.createAdminClient() var admin = zkCluster.createAdminClient()
@ -334,7 +349,7 @@ class ZkMigrationIntegrationTest {
val clusterId = zkCluster.clusterId() val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder( val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder(). new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
setClusterId(Uuid.fromString(clusterId)). setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0). setNumBrokerNodes(0).
setNumControllerNodes(1).build()) setNumControllerNodes(1).build())

View File

@ -151,7 +151,6 @@ import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME; import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.COMPLETES_IN_TRANSACTION;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION; import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION;
@ -684,21 +683,16 @@ public final class QuorumController implements Controller {
* Operations without this flag will always return NOT_CONTROLLER when invoked in premigration * Operations without this flag will always return NOT_CONTROLLER when invoked in premigration
* mode. * mode.
* <p> * <p>
* In pre-migration mode, we are still waiting to load the metadata from Apache * In pre-migration mode, we are still waiting to load the metadata from Apache ZooKeeper into
* ZooKeeper into the metadata log. Therefore, the metadata log is mostly empty, * the metadata log. Therefore, the metadata log is mostly empty, even though the cluster really
* even though the cluster really does have metadata. Very few operations should * does have metadata
* use this flag. * <p>
* Events using this flag will be completed even if a transaction is ongoing. Pre-migration
* events will be completed using the unstable (committed) offset rather than the stable offset.
* <p>
* In practice, very few operations should use this flag.
*/ */
RUNS_IN_PREMIGRATION, RUNS_IN_PREMIGRATION
/**
* This flag signifies that an event will be completed even if it is part of an unfinished transaction.
* This is needed for metadata transactions so that external callers can add records to a transaction
* and still use the returned future. One example usage of this flag is the batches of migrations records.
* The migration driver needs to wait on each submitted batch to avoid overwhelming the controller queue
* with events, so it needs events to be completed based on the committed (i.e., not stable) offset.
*/
COMPLETES_IN_TRANSACTION
} }
interface ControllerWriteOperation<T> { interface ControllerWriteOperation<T> {
@ -779,7 +773,14 @@ public final class QuorumController implements Controller {
// If the operation did not return any records, then it was actually just // If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done // a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data. // from the latest in-memory state, which might contain uncommitted data.
OptionalLong maybeOffset = deferredEventQueue.highestPendingOffset(); // If the operation can complete within a transaction, let it use the
// unstable purgatory so that it can complete sooner.
OptionalLong maybeOffset;
if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
maybeOffset = deferredUnstableEventQueue.highestPendingOffset();
} else {
maybeOffset = deferredEventQueue.highestPendingOffset();
}
if (!maybeOffset.isPresent()) { if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no // If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can complete immediately. // uncommitted state. We can complete immediately.
@ -841,7 +842,7 @@ public final class QuorumController implements Controller {
// Remember the latest offset and future if it is not already completed // Remember the latest offset and future if it is not already completed
if (!future.isDone()) { if (!future.isDone()) {
if (flags.contains(COMPLETES_IN_TRANSACTION)) { if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
deferredUnstableEventQueue.add(resultAndOffset.offset(), this); deferredUnstableEventQueue.add(resultAndOffset.offset(), this);
} else { } else {
deferredEventQueue.add(resultAndOffset.offset(), this); deferredEventQueue.add(resultAndOffset.offset(), this);
@ -961,9 +962,7 @@ public final class QuorumController implements Controller {
} }
class MigrationRecordConsumer implements ZkRecordConsumer { class MigrationRecordConsumer implements ZkRecordConsumer {
private final EnumSet<ControllerOperationFlag> eventFlags = EnumSet.of( private final EnumSet<ControllerOperationFlag> eventFlags = EnumSet.of(RUNS_IN_PREMIGRATION);
RUNS_IN_PREMIGRATION, COMPLETES_IN_TRANSACTION
);
private volatile OffsetAndEpoch highestMigrationRecordOffset; private volatile OffsetAndEpoch highestMigrationRecordOffset;
@ -1313,7 +1312,7 @@ public final class QuorumController implements Controller {
rescheduleMaybeFenceStaleBrokers(); rescheduleMaybeFenceStaleBrokers();
return result; return result;
}, },
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)); EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
} }
private void cancelMaybeFenceReplicas() { private void cancelMaybeFenceReplicas() {

View File

@ -688,7 +688,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
} catch (Throwable t) { } catch (Throwable t) {
MigrationManifest partialManifest = manifestBuilder.build(); MigrationManifest partialManifest = manifestBuilder.build();
log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest); log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest, t);
zkRecordConsumer.abortMigration(); // This terminates the controller via fatal fault handler zkRecordConsumer.abortMigration(); // This terminates the controller via fatal fault handler
super.handleException(t); super.handleException(t);
} }

View File

@ -117,6 +117,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1491,6 +1492,7 @@ public class QuorumControllerTest {
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0). setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0) setPartitionEpoch(0), (short) 0)
)); ));
@Test @Test
public void testFailoverDuringMigrationTransaction() throws Exception { public void testFailoverDuringMigrationTransaction() throws Exception {
try ( try (
@ -1534,4 +1536,62 @@ public class QuorumControllerTest {
} }
} }
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder
.setZkMigrationEnabled(true)
.setMaxIdleIntervalNs(OptionalLong.of(TimeUnit.MILLISECONDS.toNanos(100)))
).
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test"));
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
QuorumController active = controlEnv.activeController(true);
// Register a ZK broker
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(0).
setRack(null).
setClusterId(active.clusterId()).
setIsMigratingZkBroker(true).
setFeatures(brokerFeatures(metadataVersion, metadataVersion)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
setPort(9092)).iterator()))).get();
// Start migration
ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
// Interleave migration batches with heartbeats. Ensure the heartbeat events use the correct
// offset when adding to the purgatory. Otherwise, we get errors like:
// There is already a deferred event with offset 292. We should not add one with an offset of 241 which is lower than that.
for (int i = 0; i < 100; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "testBrokerHeartbeatDuringMigration" + i;
Future<?> migrationFuture = migrationConsumer.acceptBatch(
Arrays.asList(
new ApiMessageAndVersion(new TopicRecord().setTopicId(topicId).setName(topicName), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(0, 1, 2)), (short) 0)));
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
setCurrentMetadataOffset(100000L + i));
migrationFuture.get();
}
// Ensure that we can complete a heartbeat even though we leave migration transaction hanging
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
setCurrentMetadataOffset(100100L)).get());
}
}
} }