diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 31b4957e3f5..4dac53ed292 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -285,6 +285,7 @@ class SharedServer( setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). setFaultHandler(metadataLoaderFaultHandler). setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()). + setLeaderAndEpochAccessor(() => _raftManager.client.leaderAndEpoch()). setMetrics(metadataLoaderMetrics) loader = loaderBuilder.build() snapshotEmitter = new SnapshotEmitter.Builder(). diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index c6194f4e6dd..e967228a59e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -75,6 +75,7 @@ public class MetadataLoader implements RaftClient.Listener private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e); private MetadataLoaderMetrics metrics = null; private Supplier highWaterMarkAccessor = null; + private Supplier leaderAndEpochAccessor = () -> LeaderAndEpoch.UNKNOWN; public Builder setNodeId(int nodeId) { this.nodeId = nodeId; @@ -101,6 +102,11 @@ public class MetadataLoader implements RaftClient.Listener return this; } + public Builder setLeaderAndEpochAccessor(Supplier leaderAndEpochAccessor) { + this.leaderAndEpochAccessor = leaderAndEpochAccessor; + return this; + } + public Builder setMetrics(MetadataLoaderMetrics metrics) { this.metrics = metrics; return this; @@ -126,7 +132,8 @@ public class MetadataLoader implements RaftClient.Listener threadNamePrefix, faultHandler, metrics, - highWaterMarkAccessor); + highWaterMarkAccessor, + leaderAndEpochAccessor); } } @@ -157,6 +164,11 @@ public class MetadataLoader implements RaftClient.Listener */ private final Supplier highWaterMarkAccessor; + /** + * A function which supplies the current leader and epoch. + */ + private final Supplier leaderAndEpochAccessor; + /** * Publishers which haven't received any metadata yet. */ @@ -197,13 +209,15 @@ public class MetadataLoader implements RaftClient.Listener String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, - Supplier highWaterMarkAccessor + Supplier highWaterMarkAccessor, + Supplier leaderAndEpochAccessor ) { this.log = logContext.logger(MetadataLoader.class); this.time = time; this.faultHandler = faultHandler; this.metrics = metrics; this.highWaterMarkAccessor = highWaterMarkAccessor; + this.leaderAndEpochAccessor = leaderAndEpochAccessor; this.uninitializedPublishers = new LinkedHashMap<>(); this.publishers = new LinkedHashMap<>(); this.image = MetadataImage.EMPTY; @@ -245,8 +259,9 @@ public class MetadataLoader implements RaftClient.Listener offset + " and high water mark is {}", where, highWaterMark.getAsLong()); return true; } - log.info("{}: The loader finished catching up to the current high water mark of {}", - where, highWaterMark.getAsLong()); + currentLeaderAndEpoch = leaderAndEpochAccessor.get(); + log.info("{}: The loader finished catching up to the current high water mark of {}, {}", + where, highWaterMark.getAsLong(), currentLeaderAndEpoch); catchingUp = false; return false; } diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index 921c241a09a..ad771c7bfb3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -55,6 +55,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -97,6 +98,7 @@ public class MetadataLoaderTest { volatile LogDeltaManifest latestLogDeltaManifest = null; volatile SnapshotManifest latestSnapshotManifest = null; volatile boolean closed = false; + volatile LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN; MockPublisher() { this("MockPublisher"); @@ -132,6 +134,11 @@ public class MetadataLoaderTest { firstPublish.complete(null); } + @Override + public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) { + currentLeaderAndEpoch = newLeaderAndEpoch; + } + @Override public void close() throws Exception { firstPublish.completeExceptionally(new RejectedExecutionException()); @@ -549,6 +556,7 @@ public class MetadataLoaderTest { try (MetadataLoader loader = new MetadataLoader.Builder(). setFaultHandler(faultHandler). setHighWaterMarkAccessor(() -> highWaterMark.get()). + setLeaderAndEpochAccessor(() -> new LeaderAndEpoch(OptionalInt.of(123), 456)). build()) { loader.installPublishers(publishers).get(); loadTestSnapshot(loader, 200); @@ -568,6 +576,8 @@ public class MetadataLoaderTest { loadTestSnapshot(loader, 220); assertEquals(220L, loader.lastAppliedOffset()); publishers.get(0).firstPublish.get(1, TimeUnit.MINUTES); + assertEquals(new LeaderAndEpoch(OptionalInt.of(123), 456), + publishers.get(0).currentLeaderAndEpoch); } faultHandler.maybeRethrowFirstException(); }