MINOR: update leaderAndEpoch before initializing metadata publishers

This commit is contained in:
Colin P. McCabe 2024-02-13 16:47:43 -08:00
parent 0bf830fc9c
commit b31840f8d7
3 changed files with 30 additions and 4 deletions

View File

@ -285,6 +285,7 @@ class SharedServer(
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
setLeaderAndEpochAccessor(() => _raftManager.client.leaderAndEpoch()).
setMetrics(metadataLoaderMetrics)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().

View File

@ -75,6 +75,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
private MetadataLoaderMetrics metrics = null;
private Supplier<OptionalLong> highWaterMarkAccessor = null;
private Supplier<LeaderAndEpoch> leaderAndEpochAccessor = () -> LeaderAndEpoch.UNKNOWN;
public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
@ -101,6 +102,11 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
return this;
}
public Builder setLeaderAndEpochAccessor(Supplier<LeaderAndEpoch> leaderAndEpochAccessor) {
this.leaderAndEpochAccessor = leaderAndEpochAccessor;
return this;
}
public Builder setMetrics(MetadataLoaderMetrics metrics) {
this.metrics = metrics;
return this;
@ -126,7 +132,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
threadNamePrefix,
faultHandler,
metrics,
highWaterMarkAccessor);
highWaterMarkAccessor,
leaderAndEpochAccessor);
}
}
@ -157,6 +164,11 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
*/
private final Supplier<OptionalLong> highWaterMarkAccessor;
/**
* A function which supplies the current leader and epoch.
*/
private final Supplier<LeaderAndEpoch> leaderAndEpochAccessor;
/**
* Publishers which haven't received any metadata yet.
*/
@ -197,13 +209,15 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
String threadNamePrefix,
FaultHandler faultHandler,
MetadataLoaderMetrics metrics,
Supplier<OptionalLong> highWaterMarkAccessor
Supplier<OptionalLong> highWaterMarkAccessor,
Supplier<LeaderAndEpoch> leaderAndEpochAccessor
) {
this.log = logContext.logger(MetadataLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.metrics = metrics;
this.highWaterMarkAccessor = highWaterMarkAccessor;
this.leaderAndEpochAccessor = leaderAndEpochAccessor;
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
@ -245,8 +259,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
offset + " and high water mark is {}", where, highWaterMark.getAsLong());
return true;
}
log.info("{}: The loader finished catching up to the current high water mark of {}",
where, highWaterMark.getAsLong());
currentLeaderAndEpoch = leaderAndEpochAccessor.get();
log.info("{}: The loader finished catching up to the current high water mark of {}, {}",
where, highWaterMark.getAsLong(), currentLeaderAndEpoch);
catchingUp = false;
return false;
}

View File

@ -55,6 +55,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -97,6 +98,7 @@ public class MetadataLoaderTest {
volatile LogDeltaManifest latestLogDeltaManifest = null;
volatile SnapshotManifest latestSnapshotManifest = null;
volatile boolean closed = false;
volatile LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
MockPublisher() {
this("MockPublisher");
@ -132,6 +134,11 @@ public class MetadataLoaderTest {
firstPublish.complete(null);
}
@Override
public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
currentLeaderAndEpoch = newLeaderAndEpoch;
}
@Override
public void close() throws Exception {
firstPublish.completeExceptionally(new RejectedExecutionException());
@ -549,6 +556,7 @@ public class MetadataLoaderTest {
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> highWaterMark.get()).
setLeaderAndEpochAccessor(() -> new LeaderAndEpoch(OptionalInt.of(123), 456)).
build()) {
loader.installPublishers(publishers).get();
loadTestSnapshot(loader, 200);
@ -568,6 +576,8 @@ public class MetadataLoaderTest {
loadTestSnapshot(loader, 220);
assertEquals(220L, loader.lastAppliedOffset());
publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES);
assertEquals(new LeaderAndEpoch(OptionalInt.of(123), 456),
publishers.get(0).currentLeaderAndEpoch);
}
faultHandler.maybeRethrowFirstException();
}