diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index 346044c39e6..9d4682182a7 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -64,7 +64,7 @@ class BrokerLifecycleManager( if (isZkBroker) { builder.append(" isZkBroker=true") } - builder.append("]") + builder.append("] ") builder.toString() } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 191e3c45f02..e3a657cd982 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -138,6 +138,8 @@ class BrokerServer( def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE + val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]() + private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { lock.lock() try { @@ -406,7 +408,6 @@ class BrokerServer( config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) - val publishers = new util.ArrayList[MetadataPublisher]() brokerMetadataPublisher = new BrokerMetadataPublisher(config, metadataCache, logManager, @@ -431,7 +432,7 @@ class BrokerServer( authorizer, sharedServer.initialBrokerMetadataLoadFaultHandler, sharedServer.metadataPublishingFaultHandler) - publishers.add(brokerMetadataPublisher) + metadataPublishers.add(brokerMetadataPublisher) // Register parts of the broker that can be reconfigured via dynamic configs. This needs to // be done before we publish the dynamic configs, so that we don't miss anything. @@ -440,7 +441,7 @@ class BrokerServer( // Install all the metadata publishers. FutureUtils.waitWithLogging(logger.underlying, logIdent, "the broker metadata publishers to be installed", - sharedServer.loader.installPublishers(publishers), startupDeadline, time) + sharedServer.loader.installPublishers(metadataPublishers), startupDeadline, time) // Wait for this broker to contact the quorum, and for the active controller to acknowledge // us as caught up. It will do this by returning a heartbeat response with isCaughtUp set to @@ -537,14 +538,14 @@ class BrokerServer( error("Got unexpected exception waiting for controlled shutdown future", e) } } - lifecycleManager.beginShutdown() - // Stop socket server to stop accepting any more connections and requests. // Socket server will be shutdown towards the end of the sequence. if (socketServer != null) { CoreUtils.swallow(socketServer.stopProcessingRequests(), this) } + metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get()) + metadataPublishers.clear() if (dataPlaneRequestHandlerPool != null) CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) if (dataPlaneRequestProcessor != null) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 34afeb7fc49..084ff43fcf1 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -974,6 +974,26 @@ class KRaftClusterTest { } } + /** + * Test a single broker, single controller cluster at the minimum bootstrap level. This tests + * that we can function without having periodic NoOpRecords written. + */ + @Test + def testSingleControllerSingleBrokerCluster(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + } finally { + cluster.close() + } + } + @ParameterizedTest @ValueSource(booleans = Array(false, true)) def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit = { diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index ac34d1fa4a5..839ea9f3e93 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -42,6 +42,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -140,6 +141,8 @@ public class MetadataLoader implements RaftClient.Listener } } + private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers"; + /** * The log4j logger for this loader. */ @@ -166,7 +169,7 @@ public class MetadataLoader implements RaftClient.Listener private final Supplier highWaterMarkAccessor; /** - * Publishers which haven't been initialized yet. + * Publishers which haven't received any metadata yet. */ private final LinkedHashMap uninitializedPublishers; @@ -176,9 +179,10 @@ public class MetadataLoader implements RaftClient.Listener private final LinkedHashMap publishers; /** - * True if we have caught up with the initial high water mark. + * True if we have not caught up with the initial high water mark. + * We do not send out any metadata updates until this is true. */ - private boolean catchingUp = false; + private boolean catchingUp = true; /** * The current leader and epoch. @@ -212,38 +216,67 @@ public class MetadataLoader implements RaftClient.Listener this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; - this.eventQueue = new KafkaEventQueue(time, logContext, + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix + "metadata-loader-", new ShutdownEvent()); } - private boolean stillNeedToCatchUp(long offset) { + private boolean stillNeedToCatchUp(String where, long offset) { if (!catchingUp) { - log.trace("We are not in the initial catching up state."); + log.trace("{}: we are not in the initial catching up state.", where); return false; } OptionalLong highWaterMark = highWaterMarkAccessor.get(); if (!highWaterMark.isPresent()) { - log.info("The loader is still catching up because we still don't know the high " + - "water mark yet."); + log.info("{}: the loader is still catching up because we still don't know the high " + + "water mark yet.", where); return true; } - if (highWaterMark.getAsLong() > offset) { - log.info("The loader is still catching up because we have loaded up to offset " + - offset + ", but the high water mark is " + highWaterMark.getAsLong()); + if (highWaterMark.getAsLong() - 1 > offset) { + log.info("{}: The loader is still catching up because we have loaded up to offset " + + offset + ", but the high water mark is {}", where, highWaterMark.getAsLong()); return true; } - log.info("The loader finished catch up to the current high water mark of " + - highWaterMark.getAsLong()); - catchingUp = true; + log.info("{}: The loader finished catching up to the current high water mark of {}", + where, highWaterMark.getAsLong()); + catchingUp = false; return false; } - private void maybeInitializeNewPublishers() { + /** + * Schedule an event to initialize the new publishers that are present in the system. + * + * @param delayNs The minimum time in nanoseconds we should wait. If there is already an + * initialization event scheduled, we will either move its deadline forward + * in time or leave it unchanged. + */ + void scheduleInitializeNewPublishers(long delayNs) { + eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS, + new EventQueue.EarliestDeadlineFunction(eventQueue.time().nanoseconds() + delayNs), + () -> { + try { + initializeNewPublishers(); + } catch (Throwable e) { + faultHandler.handleFault("Unhandled error initializing new publishers", e); + } + }); + } + + void initializeNewPublishers() { if (uninitializedPublishers.isEmpty()) { - log.trace("There are no uninitialized publishers to initialize."); + log.debug("InitializeNewPublishers: nothing to do."); return; } + if (stillNeedToCatchUp("initializeNewPublishers", image.highestOffsetAndEpoch().offset())) { + // Reschedule the initialization for later. + log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} " + + "because we are still catching up with quorum metadata. Rescheduling.", + uninitializedPublisherNames()); + scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100)); + return; + } + log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}", + uninitializedPublisherNames()); long startNs = time.nanoseconds(); MetadataDelta delta = new MetadataDelta.Builder(). setImage(image). @@ -260,19 +293,22 @@ public class MetadataLoader implements RaftClient.Listener MetadataPublisher publisher = iter.next(); iter.remove(); try { - log.info("Publishing initial snapshot at offset {} to {}", - image.highestOffsetAndEpoch().offset(), publisher.name()); + log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}", + publisher.name(), image.highestOffsetAndEpoch().offset()); publisher.onMetadataUpdate(delta, image, manifest); publisher.onControllerChange(currentLeaderAndEpoch); publishers.put(publisher.name(), publisher); } catch (Throwable e) { - faultHandler.handleFault("Unhandled error publishing the initial metadata " + - "image from snapshot at offset " + image.highestOffsetAndEpoch().offset() + - " with publisher " + publisher.name(), e); + faultHandler.handleFault("Unhandled error initializing " + publisher.name() + + " with a snapshot at offset " + image.highestOffsetAndEpoch().offset(), e); } } } + private String uninitializedPublisherNames() { + return String.join(", ", uninitializedPublishers.keySet()); + } + @Override public void handleCommit(BatchReader reader) { eventQueue.append(() -> { @@ -282,7 +318,7 @@ public class MetadataLoader implements RaftClient.Listener build(); LogDeltaManifest manifest = loadLogDelta(delta, reader); if (log.isDebugEnabled()) { - log.debug("Generated a metadata delta between {} and {} from {} batch(es) " + + log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) " + "in {} us.", image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs())); } @@ -294,10 +330,12 @@ public class MetadataLoader implements RaftClient.Listener " and " + manifest.provenance().lastContainedOffset(), e); return; } - if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { + if (stillNeedToCatchUp("handleCommit", manifest.provenance().lastContainedOffset())) { return; } - log.debug("Publishing new image with provenance {}.", image.provenance()); + if (log.isDebugEnabled()) { + log.debug("handleCommit: publishing new image with provenance {}.", image.provenance()); + } for (MetadataPublisher publisher : publishers.values()) { try { publisher.onMetadataUpdate(delta, image, manifest); @@ -307,8 +345,10 @@ public class MetadataLoader implements RaftClient.Listener " with publisher " + publisher.name(), e); } } - maybeInitializeNewPublishers(); metrics.updateLastAppliedImageProvenance(image.provenance()); + if (uninitializedPublishers.isEmpty()) { + scheduleInitializeNewPublishers(0); + } } catch (Throwable e) { // This is a general catch-all block where we don't expect to end up; // failure-prone operations should have individual try/catch blocks around them. @@ -379,11 +419,9 @@ public class MetadataLoader implements RaftClient.Listener setImage(image). build(); SnapshotManifest manifest = loadSnapshot(delta, reader); - if (log.isDebugEnabled()) { - log.debug("Generated a metadata delta from a snapshot at offset {} " + - "in {} us.", manifest.provenance().lastContainedOffset(), - NANOSECONDS.toMicros(manifest.elapsedNs())); - } + log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " + + "in {} us.", manifest.provenance().lastContainedOffset(), + NANOSECONDS.toMicros(manifest.elapsedNs())); try { image = delta.apply(manifest.provenance()); } catch (Throwable e) { @@ -391,10 +429,10 @@ public class MetadataLoader implements RaftClient.Listener "snapshot at offset " + reader.lastContainedLogOffset(), e); return; } - if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { + if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) { return; } - log.debug("Publishing new snapshot image with provenance {}.", image.provenance()); + log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance()); for (MetadataPublisher publisher : publishers.values()) { try { publisher.onMetadataUpdate(delta, image, manifest); @@ -404,8 +442,10 @@ public class MetadataLoader implements RaftClient.Listener " with publisher " + publisher.name(), e); } } - maybeInitializeNewPublishers(); metrics.updateLastAppliedImageProvenance(image.provenance()); + if (uninitializedPublishers.isEmpty()) { + scheduleInitializeNewPublishers(0); + } } catch (Throwable e) { // This is a general catch-all block where we don't expect to end up; // failure-prone operations should have individual try/catch blocks around them. @@ -480,7 +520,29 @@ public class MetadataLoader implements RaftClient.Listener CompletableFuture future = new CompletableFuture<>(); eventQueue.append(() -> { try { - installNewPublishers(newPublishers); + // Check that none of the publishers we are trying to install are already present. + for (MetadataPublisher newPublisher : newPublishers) { + MetadataPublisher prev = publishers.get(newPublisher.name()); + if (prev == null) { + prev = uninitializedPublishers.get(newPublisher.name()); + } + if (prev != null) { + if (prev == newPublisher) { + throw faultHandler.handleFault("Attempted to install publisher " + + newPublisher.name() + ", which is already installed."); + } else { + throw faultHandler.handleFault("Attempted to install a new publisher " + + "named " + newPublisher.name() + ", but there is already a publisher " + + "with that name."); + } + } + } + // After installation, new publishers must be initialized by sending them a full + // snapshot of the current state. However, we cannot necessarily do that immediately, + // because the loader itself might not be ready. Therefore, we schedule a background + // task. + newPublishers.forEach(p -> uninitializedPublishers.put(p.name(), p)); + scheduleInitializeNewPublishers(0); future.complete(null); } catch (Throwable e) { future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " + @@ -490,29 +552,6 @@ public class MetadataLoader implements RaftClient.Listener return future; } - void installNewPublishers( - List newPublishers - ) { - // Publishers can't be re-installed if they're already present. - for (MetadataPublisher newPublisher : newPublishers) { - MetadataPublisher prev = publishers.get(newPublisher.name()); - if (prev == null) { - prev = uninitializedPublishers.get(newPublisher.name()); - } - if (prev != null) { - if (prev == newPublisher) { - throw faultHandler.handleFault("Attempted to install publisher " + - newPublisher.name() + ", which is already installed."); - } else { - throw faultHandler.handleFault("Attempted to install a new publisher " + - "named " + newPublisher.name() + ", but there is already a publisher " + - "with that name."); - } - } - uninitializedPublishers.put(newPublisher.name(), newPublisher); - } - } - // VisibleForTesting void waitForAllEventsToBeHandled() throws Exception { CompletableFuture future = new CompletableFuture<>(); diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index b234d36a708..c7f651cf895 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -42,7 +42,10 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.Iterator; import java.util.List; import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -71,12 +74,13 @@ public class MetadataLoaderTest { } static class MockPublisher implements MetadataPublisher { + final CompletableFuture firstPublish = new CompletableFuture<>(); private final String name; - MetadataDelta latestDelta = null; - MetadataImage latestImage = null; - LogDeltaManifest latestLogDeltaManifest = null; - SnapshotManifest latestSnapshotManifest = null; - boolean closed = false; + volatile MetadataDelta latestDelta = null; + volatile MetadataImage latestImage = null; + volatile LogDeltaManifest latestLogDeltaManifest = null; + volatile SnapshotManifest latestSnapshotManifest = null; + volatile boolean closed = false; MockPublisher() { this("MockPublisher"); @@ -109,10 +113,12 @@ public class MetadataLoaderTest { default: throw new RuntimeException("Invalid manifest type " + manifest.type()); } + firstPublish.complete(null); } @Override public void close() throws Exception { + firstPublish.completeExceptionally(new RejectedExecutionException()); closed = true; } } @@ -262,7 +268,7 @@ public class MetadataLoaderTest { new MockPublisher("c")); try (MetadataLoader loader = new MetadataLoader.Builder(). setFaultHandler(faultHandler). - setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). + setHighWaterMarkAccessor(() -> OptionalLong.of(1L)). build()) { loader.installPublishers(publishers.subList(0, 2)).get(); loader.removeAndClosePublisher(publishers.get(1)).get(); @@ -276,6 +282,7 @@ public class MetadataLoaderTest { loader.handleSnapshot(snapshotReader); loader.waitForAllEventsToBeHandled(); assertTrue(snapshotReader.closed); + publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES); loader.removeAndClosePublisher(publishers.get(0)).get(); } assertTrue(publishers.get(0).closed); @@ -302,6 +309,7 @@ public class MetadataLoaderTest { setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). build()) { loader.installPublishers(publishers).get(); + publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS); loadEmptySnapshot(loader, 200); assertEquals(200L, loader.lastAppliedOffset()); loadEmptySnapshot(loader, 300); @@ -396,12 +404,13 @@ public class MetadataLoaderTest { try (MetadataLoader loader = new MetadataLoader.Builder(). setFaultHandler(faultHandler). setTime(time). - setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). + setHighWaterMarkAccessor(() -> OptionalLong.of(1L)). build()) { loader.installPublishers(publishers).get(); loadTestSnapshot(loader, 200); + publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS); MockBatchReader batchReader = new MockBatchReader(300, asList( - Batch.control(300, 100, 4000, 10, 400))). + Batch.control(300, 100, 4000, 10, 400))). setTime(time); loader.handleCommit(batchReader); loader.waitForAllEventsToBeHandled(); @@ -427,7 +436,7 @@ public class MetadataLoaderTest { new MockPublisher("b")); try (MetadataLoader loader = new MetadataLoader.Builder(). setFaultHandler(faultHandler). - setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). + setHighWaterMarkAccessor(() -> OptionalLong.of(1L)). build()) { loader.installPublishers(publishers).get(); loader.handleSnapshot(MockSnapshotReader.fromRecordLists( @@ -439,6 +448,9 @@ public class MetadataLoaderTest { setName("foo"). setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0)) ))); + for (MockPublisher publisher : publishers) { + publisher.firstPublish.get(1, TimeUnit.MINUTES); + } loader.waitForAllEventsToBeHandled(); assertEquals(200L, loader.lastAppliedOffset()); loader.handleCommit(new MockBatchReader(201, asList( @@ -461,6 +473,7 @@ public class MetadataLoaderTest { * Test that we do not leave the catchingUp state state until we have loaded up to the high * water mark. */ + @Test public void testCatchingUpState() throws Exception { MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset"); List publishers = asList(new MockPublisher("a"), @@ -476,22 +489,18 @@ public class MetadataLoaderTest { // We don't update lastAppliedOffset because we're still in catchingUp state due to // highWaterMark being OptionalLong.empty (aka unknown). assertEquals(-1L, loader.lastAppliedOffset()); + assertFalse(publishers.get(0).firstPublish.isDone()); - // Setting the high water mark here doesn't do anything because we only check it when - // we're publishing an update. This is OK because we know that we'll get updates - // frequently. If there is no other activity, there will at least be NoOpRecords. - highWaterMark.set(OptionalLong.of(0)); - assertEquals(-1L, loader.lastAppliedOffset()); - - // This still doesn't advance lastAppliedOffset since the high water mark at 220 + // This still doesn't advance lastAppliedOffset since the high water mark at 221 // is greater than our snapshot at 210. - highWaterMark.set(OptionalLong.of(220)); + highWaterMark.set(OptionalLong.of(221)); loadTestSnapshot(loader, 210); assertEquals(-1L, loader.lastAppliedOffset()); // Loading a test snapshot at 220 allows us to leave catchUp state. loadTestSnapshot(loader, 220); assertEquals(220L, loader.lastAppliedOffset()); + publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES); } faultHandler.maybeRethrowFirstException(); } diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java index 47befabcaba..e03c4eb0bff 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class SnapshotGeneratorTest { static class MockEmitter implements SnapshotGenerator.Emitter { private final CountDownLatch latch = new CountDownLatch(1); - private final List images = new ArrayList<>(); + private final List images = new CopyOnWriteArrayList<>(); private RuntimeException problem = null; MockEmitter setReady() { @@ -66,14 +67,14 @@ public class SnapshotGeneratorTest { throw currentProblem; } try { - latch.await(); + latch.await(30, TimeUnit.SECONDS); } catch (Throwable e) { throw new RuntimeException(e); } images.add(image); } - synchronized List images() { + List images() { return new ArrayList<>(images); } } diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java index 4e49a9a9153..70f859d9f89 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java @@ -460,6 +460,10 @@ public final class KafkaEventQueue implements EventQueue { this.eventHandlerThread.start(); } + public Time time() { + return time; + } + @Override public void enqueue(EventInsertionType insertionType, String tag,