KAFKA-14857: Fix some MetadataLoader bugs (#13462)

The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
that and adds a junit test.

Another issue is that the MetadataLoader previously assumed that we would periodically get
callbacks from the Raft layer even if nothing had happened. We relied on this to install new
publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.

Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
controllers).

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2023-03-29 12:30:12 -07:00 committed by GitHub
parent 379b6978a0
commit 09e59bc776
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 83 deletions

View File

@ -138,6 +138,8 @@ class BrokerServer(
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock() lock.lock()
try { try {
@ -406,7 +408,6 @@ class BrokerServer(
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix) DataPlaneAcceptor.ThreadPrefix)
val publishers = new util.ArrayList[MetadataPublisher]()
brokerMetadataPublisher = new BrokerMetadataPublisher(config, brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache, metadataCache,
logManager, logManager,
@ -431,7 +432,7 @@ class BrokerServer(
authorizer, authorizer,
sharedServer.initialBrokerMetadataLoadFaultHandler, sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler) sharedServer.metadataPublishingFaultHandler)
publishers.add(brokerMetadataPublisher) metadataPublishers.add(brokerMetadataPublisher)
// Register parts of the broker that can be reconfigured via dynamic configs. This needs to // Register parts of the broker that can be reconfigured via dynamic configs. This needs to
// be done before we publish the dynamic configs, so that we don't miss anything. // be done before we publish the dynamic configs, so that we don't miss anything.
@ -440,7 +441,7 @@ class BrokerServer(
// Install all the metadata publishers. // Install all the metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent, FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the broker metadata publishers to be installed", "the broker metadata publishers to be installed",
sharedServer.loader.installPublishers(publishers), startupDeadline, time) sharedServer.loader.installPublishers(metadataPublishers), startupDeadline, time)
// Wait for this broker to contact the quorum, and for the active controller to acknowledge // Wait for this broker to contact the quorum, and for the active controller to acknowledge
// us as caught up. It will do this by returning a heartbeat response with isCaughtUp set to // us as caught up. It will do this by returning a heartbeat response with isCaughtUp set to
@ -537,14 +538,14 @@ class BrokerServer(
error("Got unexpected exception waiting for controlled shutdown future", e) error("Got unexpected exception waiting for controlled shutdown future", e)
} }
} }
lifecycleManager.beginShutdown() lifecycleManager.beginShutdown()
// Stop socket server to stop accepting any more connections and requests. // Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence. // Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) { if (socketServer != null) {
CoreUtils.swallow(socketServer.stopProcessingRequests(), this) CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
} }
metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
if (dataPlaneRequestHandlerPool != null) if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null) if (dataPlaneRequestProcessor != null)

View File

@ -974,6 +974,26 @@ class KRaftClusterTest {
} }
} }
/**
* Test a single broker, single controller cluster at the minimum bootstrap level. This tests
* that we can function without having periodic NoOpRecords written.
*/
@Test
def testSingleControllerSingleBrokerCluster(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
} finally {
cluster.close()
}
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = Array(false, true)) @ValueSource(booleans = Array(false, true))
def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit = { def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit = {

View File

@ -42,6 +42,7 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS;
@ -140,6 +141,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
} }
} }
private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers";
/** /**
* The log4j logger for this loader. * The log4j logger for this loader.
*/ */
@ -166,7 +169,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
private final Supplier<OptionalLong> highWaterMarkAccessor; private final Supplier<OptionalLong> highWaterMarkAccessor;
/** /**
* Publishers which haven't been initialized yet. * Publishers which haven't received any metadata yet.
*/ */
private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers; private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
@ -176,9 +179,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
private final LinkedHashMap<String, MetadataPublisher> publishers; private final LinkedHashMap<String, MetadataPublisher> publishers;
/** /**
* True if we have caught up with the initial high water mark. * True if we have not caught up with the initial high water mark.
* We do not send out any metadata updates until this is true.
*/ */
private boolean catchingUp = false; private boolean catchingUp = true;
/** /**
* The current leader and epoch. * The current leader and epoch.
@ -212,38 +216,67 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
this.uninitializedPublishers = new LinkedHashMap<>(); this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY; this.image = MetadataImage.EMPTY;
this.eventQueue = new KafkaEventQueue(time, logContext, this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix + "metadata-loader-", threadNamePrefix + "metadata-loader-",
new ShutdownEvent()); new ShutdownEvent());
} }
private boolean stillNeedToCatchUp(long offset) { private boolean stillNeedToCatchUp(String where, long offset) {
if (!catchingUp) { if (!catchingUp) {
log.trace("We are not in the initial catching up state."); log.trace("{}: we are not in the initial catching up state.", where);
return false; return false;
} }
OptionalLong highWaterMark = highWaterMarkAccessor.get(); OptionalLong highWaterMark = highWaterMarkAccessor.get();
if (!highWaterMark.isPresent()) { if (!highWaterMark.isPresent()) {
log.info("The loader is still catching up because we still don't know the high " + log.info("{}: the loader is still catching up because we still don't know the high " +
"water mark yet."); "water mark yet.", where);
return true; return true;
} }
if (highWaterMark.getAsLong() > offset) { if (highWaterMark.getAsLong() - 1 > offset) {
log.info("The loader is still catching up because we have loaded up to offset " + log.info("{}: The loader is still catching up because we have loaded up to offset " +
offset + ", but the high water mark is " + highWaterMark.getAsLong()); offset + ", but the high water mark is {}", where, highWaterMark.getAsLong());
return true; return true;
} }
log.info("The loader finished catch up to the current high water mark of " + log.info("{}: The loader finished catching up to the current high water mark of {}",
highWaterMark.getAsLong()); where, highWaterMark.getAsLong());
catchingUp = true; catchingUp = false;
return false; return false;
} }
private void maybeInitializeNewPublishers() { /**
* Schedule an event to initialize the new publishers that are present in the system.
*
* @param delayNs The minimum time in nanoseconds we should wait. If there is already an
* initialization event scheduled, we will either move its deadline forward
* in time or leave it unchanged.
*/
void scheduleInitializeNewPublishers(long delayNs) {
eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS,
new EventQueue.EarliestDeadlineFunction(eventQueue.time().nanoseconds() + delayNs),
() -> {
try {
initializeNewPublishers();
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error initializing new publishers", e);
}
});
}
void initializeNewPublishers() {
if (uninitializedPublishers.isEmpty()) { if (uninitializedPublishers.isEmpty()) {
log.trace("There are no uninitialized publishers to initialize."); log.debug("InitializeNewPublishers: nothing to do.");
return; return;
} }
if (stillNeedToCatchUp("initializeNewPublishers", image.highestOffsetAndEpoch().offset())) {
// Reschedule the initialization for later.
log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} " +
"because we are still catching up with quorum metadata. Rescheduling.",
uninitializedPublisherNames());
scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100));
return;
}
log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}",
uninitializedPublisherNames());
long startNs = time.nanoseconds(); long startNs = time.nanoseconds();
MetadataDelta delta = new MetadataDelta.Builder(). MetadataDelta delta = new MetadataDelta.Builder().
setImage(image). setImage(image).
@ -260,19 +293,22 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
MetadataPublisher publisher = iter.next(); MetadataPublisher publisher = iter.next();
iter.remove(); iter.remove();
try { try {
log.info("Publishing initial snapshot at offset {} to {}", log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}",
image.highestOffsetAndEpoch().offset(), publisher.name()); publisher.name(), image.highestOffsetAndEpoch().offset());
publisher.onMetadataUpdate(delta, image, manifest); publisher.onMetadataUpdate(delta, image, manifest);
publisher.onControllerChange(currentLeaderAndEpoch); publisher.onControllerChange(currentLeaderAndEpoch);
publishers.put(publisher.name(), publisher); publishers.put(publisher.name(), publisher);
} catch (Throwable e) { } catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the initial metadata " + faultHandler.handleFault("Unhandled error initializing " + publisher.name() +
"image from snapshot at offset " + image.highestOffsetAndEpoch().offset() + " with a snapshot at offset " + image.highestOffsetAndEpoch().offset(), e);
" with publisher " + publisher.name(), e);
} }
} }
} }
private String uninitializedPublisherNames() {
return String.join(", ", uninitializedPublishers.keySet());
}
@Override @Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> { eventQueue.append(() -> {
@ -282,7 +318,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
build(); build();
LogDeltaManifest manifest = loadLogDelta(delta, reader); LogDeltaManifest manifest = loadLogDelta(delta, reader);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Generated a metadata delta between {} and {} from {} batch(es) " + log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) " +
"in {} us.", image.offset(), manifest.provenance().lastContainedOffset(), "in {} us.", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs())); manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
} }
@ -294,10 +330,12 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
" and " + manifest.provenance().lastContainedOffset(), e); " and " + manifest.provenance().lastContainedOffset(), e);
return; return;
} }
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { if (stillNeedToCatchUp("handleCommit", manifest.provenance().lastContainedOffset())) {
return; return;
} }
log.debug("Publishing new image with provenance {}.", image.provenance()); if (log.isDebugEnabled()) {
log.debug("handleCommit: publishing new image with provenance {}.", image.provenance());
}
for (MetadataPublisher publisher : publishers.values()) { for (MetadataPublisher publisher : publishers.values()) {
try { try {
publisher.onMetadataUpdate(delta, image, manifest); publisher.onMetadataUpdate(delta, image, manifest);
@ -307,8 +345,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
" with publisher " + publisher.name(), e); " with publisher " + publisher.name(), e);
} }
} }
maybeInitializeNewPublishers();
metrics.updateLastAppliedImageProvenance(image.provenance()); metrics.updateLastAppliedImageProvenance(image.provenance());
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
} catch (Throwable e) { } catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up; // This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them. // failure-prone operations should have individual try/catch blocks around them.
@ -379,11 +419,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
setImage(image). setImage(image).
build(); build();
SnapshotManifest manifest = loadSnapshot(delta, reader); SnapshotManifest manifest = loadSnapshot(delta, reader);
if (log.isDebugEnabled()) { log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} " +
log.debug("Generated a metadata delta from a snapshot at offset {} " +
"in {} us.", manifest.provenance().lastContainedOffset(), "in {} us.", manifest.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs())); NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try { try {
image = delta.apply(manifest.provenance()); image = delta.apply(manifest.provenance());
} catch (Throwable e) { } catch (Throwable e) {
@ -391,10 +429,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
"snapshot at offset " + reader.lastContainedLogOffset(), e); "snapshot at offset " + reader.lastContainedLogOffset(), e);
return; return;
} }
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { if (stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
return; return;
} }
log.debug("Publishing new snapshot image with provenance {}.", image.provenance()); log.info("handleSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
for (MetadataPublisher publisher : publishers.values()) { for (MetadataPublisher publisher : publishers.values()) {
try { try {
publisher.onMetadataUpdate(delta, image, manifest); publisher.onMetadataUpdate(delta, image, manifest);
@ -404,8 +442,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
" with publisher " + publisher.name(), e); " with publisher " + publisher.name(), e);
} }
} }
maybeInitializeNewPublishers();
metrics.updateLastAppliedImageProvenance(image.provenance()); metrics.updateLastAppliedImageProvenance(image.provenance());
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
} catch (Throwable e) { } catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up; // This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them. // failure-prone operations should have individual try/catch blocks around them.
@ -480,20 +520,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> { eventQueue.append(() -> {
try { try {
installNewPublishers(newPublishers); // Check that none of the publishers we are trying to install are already present.
future.complete(null);
} catch (Throwable e) {
future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
"MetadataLoader#installPublishers", e));
}
});
return future;
}
void installNewPublishers(
List<? extends MetadataPublisher> newPublishers
) {
// Publishers can't be re-installed if they're already present.
for (MetadataPublisher newPublisher : newPublishers) { for (MetadataPublisher newPublisher : newPublishers) {
MetadataPublisher prev = publishers.get(newPublisher.name()); MetadataPublisher prev = publishers.get(newPublisher.name());
if (prev == null) { if (prev == null) {
@ -509,8 +536,20 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
"with that name."); "with that name.");
} }
} }
uninitializedPublishers.put(newPublisher.name(), newPublisher);
} }
// After installation, new publishers must be initialized by sending them a full
// snapshot of the current state. However, we cannot necessarily do that immediately,
// because the loader itself might not be ready. Therefore, we schedule a background
// task.
newPublishers.forEach(p -> uninitializedPublishers.put(p.name(), p));
scheduleInitializeNewPublishers(0);
future.complete(null);
} catch (Throwable e) {
future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
"MetadataLoader#installPublishers", e));
}
});
return future;
} }
// VisibleForTesting // VisibleForTesting

View File

@ -42,7 +42,10 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -71,12 +74,13 @@ public class MetadataLoaderTest {
} }
static class MockPublisher implements MetadataPublisher { static class MockPublisher implements MetadataPublisher {
final CompletableFuture<Void> firstPublish = new CompletableFuture<>();
private final String name; private final String name;
MetadataDelta latestDelta = null; volatile MetadataDelta latestDelta = null;
MetadataImage latestImage = null; volatile MetadataImage latestImage = null;
LogDeltaManifest latestLogDeltaManifest = null; volatile LogDeltaManifest latestLogDeltaManifest = null;
SnapshotManifest latestSnapshotManifest = null; volatile SnapshotManifest latestSnapshotManifest = null;
boolean closed = false; volatile boolean closed = false;
MockPublisher() { MockPublisher() {
this("MockPublisher"); this("MockPublisher");
@ -109,10 +113,12 @@ public class MetadataLoaderTest {
default: default:
throw new RuntimeException("Invalid manifest type " + manifest.type()); throw new RuntimeException("Invalid manifest type " + manifest.type());
} }
firstPublish.complete(null);
} }
@Override @Override
public void close() throws Exception { public void close() throws Exception {
firstPublish.completeExceptionally(new RejectedExecutionException());
closed = true; closed = true;
} }
} }
@ -262,7 +268,7 @@ public class MetadataLoaderTest {
new MockPublisher("c")); new MockPublisher("c"));
try (MetadataLoader loader = new MetadataLoader.Builder(). try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler). setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) { build()) {
loader.installPublishers(publishers.subList(0, 2)).get(); loader.installPublishers(publishers.subList(0, 2)).get();
loader.removeAndClosePublisher(publishers.get(1)).get(); loader.removeAndClosePublisher(publishers.get(1)).get();
@ -276,6 +282,7 @@ public class MetadataLoaderTest {
loader.handleSnapshot(snapshotReader); loader.handleSnapshot(snapshotReader);
loader.waitForAllEventsToBeHandled(); loader.waitForAllEventsToBeHandled();
assertTrue(snapshotReader.closed); assertTrue(snapshotReader.closed);
publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
loader.removeAndClosePublisher(publishers.get(0)).get(); loader.removeAndClosePublisher(publishers.get(0)).get();
} }
assertTrue(publishers.get(0).closed); assertTrue(publishers.get(0).closed);
@ -302,6 +309,7 @@ public class MetadataLoaderTest {
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
build()) { build()) {
loader.installPublishers(publishers).get(); loader.installPublishers(publishers).get();
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
loadEmptySnapshot(loader, 200); loadEmptySnapshot(loader, 200);
assertEquals(200L, loader.lastAppliedOffset()); assertEquals(200L, loader.lastAppliedOffset());
loadEmptySnapshot(loader, 300); loadEmptySnapshot(loader, 300);
@ -396,10 +404,11 @@ public class MetadataLoaderTest {
try (MetadataLoader loader = new MetadataLoader.Builder(). try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler). setFaultHandler(faultHandler).
setTime(time). setTime(time).
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) { build()) {
loader.installPublishers(publishers).get(); loader.installPublishers(publishers).get();
loadTestSnapshot(loader, 200); loadTestSnapshot(loader, 200);
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
MockBatchReader batchReader = new MockBatchReader(300, asList( MockBatchReader batchReader = new MockBatchReader(300, asList(
Batch.control(300, 100, 4000, 10, 400))). Batch.control(300, 100, 4000, 10, 400))).
setTime(time); setTime(time);
@ -427,7 +436,7 @@ public class MetadataLoaderTest {
new MockPublisher("b")); new MockPublisher("b"));
try (MetadataLoader loader = new MetadataLoader.Builder(). try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler). setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)). setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).
build()) { build()) {
loader.installPublishers(publishers).get(); loader.installPublishers(publishers).get();
loader.handleSnapshot(MockSnapshotReader.fromRecordLists( loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
@ -439,6 +448,9 @@ public class MetadataLoaderTest {
setName("foo"). setName("foo").
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0)) setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
))); )));
for (MockPublisher publisher : publishers) {
publisher.firstPublish.get(1, TimeUnit.MINUTES);
}
loader.waitForAllEventsToBeHandled(); loader.waitForAllEventsToBeHandled();
assertEquals(200L, loader.lastAppliedOffset()); assertEquals(200L, loader.lastAppliedOffset());
loader.handleCommit(new MockBatchReader(201, asList( loader.handleCommit(new MockBatchReader(201, asList(
@ -461,6 +473,7 @@ public class MetadataLoaderTest {
* Test that we do not leave the catchingUp state state until we have loaded up to the high * Test that we do not leave the catchingUp state state until we have loaded up to the high
* water mark. * water mark.
*/ */
@Test
public void testCatchingUpState() throws Exception { public void testCatchingUpState() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset"); MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
List<MockPublisher> publishers = asList(new MockPublisher("a"), List<MockPublisher> publishers = asList(new MockPublisher("a"),
@ -476,22 +489,18 @@ public class MetadataLoaderTest {
// We don't update lastAppliedOffset because we're still in catchingUp state due to // We don't update lastAppliedOffset because we're still in catchingUp state due to
// highWaterMark being OptionalLong.empty (aka unknown). // highWaterMark being OptionalLong.empty (aka unknown).
assertEquals(-1L, loader.lastAppliedOffset()); assertEquals(-1L, loader.lastAppliedOffset());
assertFalse(publishers.get(0).firstPublish.isDone());
// Setting the high water mark here doesn't do anything because we only check it when // This still doesn't advance lastAppliedOffset since the high water mark at 221
// we're publishing an update. This is OK because we know that we'll get updates
// frequently. If there is no other activity, there will at least be NoOpRecords.
highWaterMark.set(OptionalLong.of(0));
assertEquals(-1L, loader.lastAppliedOffset());
// This still doesn't advance lastAppliedOffset since the high water mark at 220
// is greater than our snapshot at 210. // is greater than our snapshot at 210.
highWaterMark.set(OptionalLong.of(220)); highWaterMark.set(OptionalLong.of(221));
loadTestSnapshot(loader, 210); loadTestSnapshot(loader, 210);
assertEquals(-1L, loader.lastAppliedOffset()); assertEquals(-1L, loader.lastAppliedOffset());
// Loading a test snapshot at 220 allows us to leave catchUp state. // Loading a test snapshot at 220 allows us to leave catchUp state.
loadTestSnapshot(loader, 220); loadTestSnapshot(loader, 220);
assertEquals(220L, loader.lastAppliedOffset()); assertEquals(220L, loader.lastAppliedOffset());
publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
} }
faultHandler.maybeRethrowFirstException(); faultHandler.maybeRethrowFirstException();
} }

View File

@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SnapshotGeneratorTest { public class SnapshotGeneratorTest {
static class MockEmitter implements SnapshotGenerator.Emitter { static class MockEmitter implements SnapshotGenerator.Emitter {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
private final List<MetadataImage> images = new ArrayList<>(); private final List<MetadataImage> images = new CopyOnWriteArrayList<>();
private RuntimeException problem = null; private RuntimeException problem = null;
MockEmitter setReady() { MockEmitter setReady() {
@ -66,14 +67,14 @@ public class SnapshotGeneratorTest {
throw currentProblem; throw currentProblem;
} }
try { try {
latch.await(); latch.await(30, TimeUnit.SECONDS);
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
images.add(image); images.add(image);
} }
synchronized List<MetadataImage> images() { List<MetadataImage> images() {
return new ArrayList<>(images); return new ArrayList<>(images);
} }
} }

View File

@ -460,6 +460,10 @@ public final class KafkaEventQueue implements EventQueue {
this.eventHandlerThread.start(); this.eventHandlerThread.start();
} }
public Time time() {
return time;
}
@Override @Override
public void enqueue(EventInsertionType insertionType, public void enqueue(EventInsertionType insertionType,
String tag, String tag,