diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 60d2c0516da..c42eb474a6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -394,16 +394,22 @@ public class Metadata implements Closeable { if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); - if (topicId != null && !topicId.equals(oldTopicId)) { + if (currentEpoch == null) { + // We have no previous info, so we can just insert the new epoch info + log.debug("Setting the last seen epoch of partition {} to {} since the last known epoch was undefined.", + tp, newEpoch); + lastSeenLeaderEpochs.put(tp, newEpoch); + return Optional.of(partitionMetadata); + } else if (topicId != null && !topicId.equals(oldTopicId)) { // If the new topic ID is valid and different from the last seen topic ID, update the metadata. // Between the time that a topic is deleted and re-created, the client may lose track of the // corresponding topicId (i.e. `oldTopicId` will be null). In this case, when we discover the new // topicId, we allow the corresponding leader epoch to override the last seen value. log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}", - tp, newEpoch, oldTopicId, topicId); + tp, newEpoch, oldTopicId, topicId); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); - } else if (currentEpoch == null || newEpoch >= currentEpoch) { + } else if (newEpoch >= currentEpoch) { // If the received leader epoch is at least the same as the previous one, update the metadata log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch);