diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index dfd461a93ea..ef01b4b8a27 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; @@ -314,6 +315,8 @@ public class Metadata implements Closeable { if (metadata.isInternal()) internalTopics.add(metadata.topic()); for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + + // Even if the partition's metadata includes an error, we need to handle the update to catch new epochs updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> { int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH); partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch)); @@ -358,8 +361,8 @@ public class Metadata implements Closeable { } } } else { - // Old cluster format (no epochs) - lastSeenLeaderEpochs.clear(); + // Handle old cluster formats as well as error responses where leader and epoch are missing + lastSeenLeaderEpochs.remove(tp); partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata)); } } @@ -444,4 +447,55 @@ public class Metadata implements Closeable { } } + public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) { + return partitionInfoIfCurrent(tp) + .map(infoAndEpoch -> { + Node leader = infoAndEpoch.partitionInfo().leader(); + return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch())); + }) + .orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp))); + } + + public static class LeaderAndEpoch { + + public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Node.noNode(), Optional.empty()); + + public final Node leader; + public final Optional epoch; + + public LeaderAndEpoch(Node leader, Optional epoch) { + this.leader = Objects.requireNonNull(leader); + this.epoch = Objects.requireNonNull(epoch); + } + + public static LeaderAndEpoch noLeaderOrEpoch() { + return NO_LEADER_OR_EPOCH; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LeaderAndEpoch that = (LeaderAndEpoch) o; + + if (!leader.equals(that.leader)) return false; + return epoch.equals(that.epoch); + } + + @Override + public int hashCode() { + int result = leader.hashCode(); + result = 31 * result + epoch.hashCode(); + return result; + } + + @Override + public String toString() { + return "LeaderAndEpoch{" + + "leader=" + leader + + ", epoch=" + epoch.map(Number::toString).orElse("absent") + + '}'; + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4cee56a2395..839057cc8b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; @@ -69,6 +70,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -1508,7 +1510,20 @@ public class KafkaConsumer implements Consumer { */ @Override public void seek(TopicPartition partition, long offset) { - seek(partition, new OffsetAndMetadata(offset, null)); + if (offset < 0) + throw new IllegalArgumentException("seek offset must not be a negative number"); + + acquireAndEnsureOpen(); + try { + log.info("Seeking to offset {} for partition {}", offset, partition); + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + offset, + Optional.empty(), // This will ensure we skip validation + this.metadata.leaderAndEpoch(partition)); + this.subscriptions.seek(partition, newPosition); + } finally { + release(); + } } /** @@ -1535,8 +1550,13 @@ public class KafkaConsumer implements Consumer { } else { log.info("Seeking to offset {} for partition {}", offset, partition); } + Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.leaderAndEpoch(partition); + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), + offsetAndMetadata.leaderEpoch(), + currentLeaderAndEpoch); this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata); - this.subscriptions.seek(partition, offset); + this.subscriptions.seekAndValidate(partition, newPosition); } finally { release(); } @@ -1658,9 +1678,9 @@ public class KafkaConsumer implements Consumer { Timer timer = time.timer(timeout); do { - Long offset = this.subscriptions.position(partition); - if (offset != null) - return offset; + SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition); + if (position != null) + return position.offset; updateFetchPositions(timer); client.poll(timer); @@ -2196,6 +2216,9 @@ public class KafkaConsumer implements Consumer { * @return true iff the operation completed without timing out */ private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + fetcher.validateOffsetsIfNeeded(); + cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHashAllFetchPositions) return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java new file mode 100644 index 00000000000..f8af50d5c90 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +/** + * In the even of unclean leader election, the log will be truncated, + * previously committed data will be lost, and new data will be written + * over these offsets. When this happens, the consumer will detect the + * truncation and raise this exception (if no automatic reset policy + * has been defined) with the first offset to diverge from what the + * consumer read. + */ +public class LogTruncationException extends OffsetOutOfRangeException { + + private final Map divergentOffsets; + + public LogTruncationException(Map divergentOffsets) { + super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset)); + this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets); + } + + /** + * Get the offsets for the partitions which were truncated. This is the first offset which is known to diverge + * from what the consumer read. + */ + public Map divergentOffsets() { + return divergentOffsets; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 614ec9b4457..c8c2e72af10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -189,13 +191,17 @@ public class MockConsumer implements Consumer { if (!subscriptions.isPaused(entry.getKey())) { final List> recs = entry.getValue(); for (final ConsumerRecord rec : recs) { - if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) { - throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey()))); + long position = subscriptions.position(entry.getKey()).offset; + + if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > position) { + throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position)); } - if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) { + if (assignment().contains(entry.getKey()) && rec.offset() >= position) { results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec); - subscriptions.position(entry.getKey(), rec.offset() + 1); + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + rec.offset() + 1, rec.leaderEpoch(), new Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch())); + subscriptions.position(entry.getKey(), newPosition); } } } @@ -290,12 +296,12 @@ public class MockConsumer implements Consumer { ensureNotClosed(); if (!this.subscriptions.isAssigned(partition)) throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); - Long offset = this.subscriptions.position(partition); - if (offset == null) { + SubscriptionState.FetchPosition position = this.subscriptions.position(partition); + if (position == null) { updateFetchPosition(partition); - offset = this.subscriptions.position(partition); + position = this.subscriptions.position(partition); } - return offset; + return position.offset; } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java new file mode 100644 index 00000000000..8b35499a26e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public abstract class AsyncClient { + + private final Logger log; + private final ConsumerNetworkClient client; + + AsyncClient(ConsumerNetworkClient client, LogContext logContext) { + this.client = client; + this.log = logContext.logger(getClass()); + } + + public RequestFuture sendAsyncRequest(Node node, T1 requestData) { + AbstractRequest.Builder requestBuilder = prepareRequest(node, requestData); + + return client.send(node, requestBuilder).compose(new RequestFutureAdapter() { + @Override + @SuppressWarnings("unchecked") + public void onSuccess(ClientResponse value, RequestFuture future) { + Resp resp; + try { + resp = (Resp) value.responseBody(); + } catch (ClassCastException cce) { + log.error("Could not cast response body", cce); + future.raise(cce); + return; + } + log.trace("Received {} {} from broker {}", resp.getClass().getSimpleName(), resp, node); + try { + future.complete(handleResponse(node, requestData, resp)); + } catch (RuntimeException e) { + if (!future.isDone()) { + future.raise(e); + } + } + } + + @Override + public void onFailure(RuntimeException e, RequestFuture future1) { + future1.raise(e); + } + }); + } + + protected Logger logger() { + return log; + } + + protected abstract AbstractRequest.Builder prepareRequest(Node node, T1 requestData); + + protected abstract T2 handleResponse(Node node, T1 requestData, Resp response); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b31bf44ae7f..64a97ab5d02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -60,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -507,10 +508,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { for (final Map.Entry entry : offsets.entrySet()) { final TopicPartition tp = entry.getKey(); - final long offset = entry.getValue().offset(); - log.info("Setting offset for partition {} to the committed offset {}", tp, offset); + final OffsetAndMetadata offsetAndMetadata = entry.getValue(); + final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp); + final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), + new ConsumerMetadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.empty())); + + log.info("Setting offset for partition {} to the committed offset {}", tp, position); entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); - this.subscriptions.seek(tp, offset); + this.subscriptions.seekAndValidate(tp, position); } return true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 93d476fa38a..870a8b7e201 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -18,10 +18,13 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MetadataCache; import org.apache.kafka.clients.StaleMetadataException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -133,6 +136,8 @@ public class Fetcher implements Closeable { private final IsolationLevel isolationLevel; private final Map sessionHandlers; private final AtomicReference cachedListOffsetsException = new AtomicReference<>(); + private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); + private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; private PartitionRecords nextInLineRecords = null; @@ -174,17 +179,18 @@ public class Fetcher implements Closeable { this.requestTimeoutMs = requestTimeoutMs; this.isolationLevel = isolationLevel; this.sessionHandlers = new HashMap<>(); + this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext); } /** * Represents data about an offset returned by a broker. */ - private static class OffsetData { + private static class ListOffsetData { final long offset; final Long timestamp; // null if the broker does not support returning timestamps final Optional leaderEpoch; // empty if the leader epoch is not known - OffsetData(long offset, Long timestamp, Optional leaderEpoch) { + ListOffsetData(long offset, Long timestamp, Optional leaderEpoch) { this.offset = offset; this.timestamp = timestamp; this.leaderEpoch = leaderEpoch; @@ -411,22 +417,45 @@ public class Fetcher implements Closeable { resetOffsetsAsync(offsetResetTimestamps); } + /** + * Validate offsets for all assigned partitions for which a leader change has been detected. + */ + public void validateOffsetsIfNeeded() { + RuntimeException exception = cachedOffsetForLeaderException.getAndSet(null); + if (exception != null) + throw exception; + + // Validate each partition against the current leader and epoch + subscriptions.assignedPartitions().forEach(topicPartition -> { + ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition); + subscriptions.maybeValidatePosition(topicPartition, leaderAndEpoch); + }); + + // Collect positions needing validation, with backoff + Map partitionsToValidate = subscriptions + .partitionsNeedingValidation(time.milliseconds()) + .stream() + .collect(Collectors.toMap(Function.identity(), subscriptions::position)); + + validateOffsetsAsync(partitionsToValidate); + } + public Map offsetsForTimes(Map timestampsToSearch, Timer timer) { metadata.addTransientTopics(topicsForPartitions(timestampsToSearch.keySet())); try { - Map fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch, + Map fetchedOffsets = fetchOffsetsByTimes(timestampsToSearch, timer, true).fetchedOffsets; HashMap offsetsByTimes = new HashMap<>(timestampsToSearch.size()); for (Map.Entry entry : timestampsToSearch.entrySet()) offsetsByTimes.put(entry.getKey(), null); - for (Map.Entry entry : fetchedOffsets.entrySet()) { + for (Map.Entry entry : fetchedOffsets.entrySet()) { // 'entry.getValue().timestamp' will not be null since we are guaranteed // to work with a v1 (or later) ListOffset request - OffsetData offsetData = entry.getValue(); + ListOffsetData offsetData = entry.getValue(); offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp, offsetData.leaderEpoch)); } @@ -570,14 +599,19 @@ public class Fetcher implements Closeable { log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else { - long position = subscriptions.position(partitionRecords.partition); - if (partitionRecords.nextFetchOffset == position) { + SubscriptionState.FetchPosition position = subscriptions.position(partitionRecords.partition); + if (partitionRecords.nextFetchOffset == position.offset) { List> partRecords = partitionRecords.fetchRecords(maxRecords); - long nextOffset = partitionRecords.nextFetchOffset; - log.trace("Returning fetched records at offset {} for assigned partition {} and update " + - "position to {}", position, partitionRecords.partition, nextOffset); - subscriptions.position(partitionRecords.partition, nextOffset); + if (partitionRecords.nextFetchOffset > position.offset) { + SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition( + partitionRecords.nextFetchOffset, + partitionRecords.lastEpoch, + position.currentLeader); + log.trace("Returning fetched records at offset {} for assigned partition {} and update " + + "position to {}", position, partitionRecords.partition, nextPosition); + subscriptions.position(partitionRecords.partition, nextPosition); + } Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel); if (partitionLag != null) @@ -601,7 +635,7 @@ public class Fetcher implements Closeable { return emptyList(); } - private void resetOffsetIfNeeded(TopicPartition partition, Long requestedResetTimestamp, OffsetData offsetData) { + private void resetOffsetIfNeeded(TopicPartition partition, Long requestedResetTimestamp, ListOffsetData offsetData) { // we might lose the assignment while fetching the offset, or the user might seek to a different offset, // so verify it is still assigned and still in need of the requested reset if (!subscriptions.isAssigned(partition)) { @@ -611,9 +645,11 @@ public class Fetcher implements Closeable { } else if (!requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", partition); } else { - log.info("Resetting offset for partition {} to offset {}.", partition, offsetData.offset); + SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( + offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition)); + log.info("Resetting offset for partition {} to offset {}.", partition, position); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); - subscriptions.seek(partition, offsetData.offset); + subscriptions.seek(partition, position); } } @@ -623,20 +659,20 @@ public class Fetcher implements Closeable { for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { Node node = entry.getKey(); final Map resetTimestamps = entry.getValue(); - subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs); + subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs); RequestFuture future = sendListOffsetRequest(node, resetTimestamps, false); future.addListener(new RequestFutureListener() { @Override public void onSuccess(ListOffsetResult result) { if (!result.partitionsToRetry.isEmpty()) { - subscriptions.resetFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs); + subscriptions.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); } - for (Map.Entry fetchedOffset : result.fetchedOffsets.entrySet()) { + for (Map.Entry fetchedOffset : result.fetchedOffsets.entrySet()) { TopicPartition partition = fetchedOffset.getKey(); - OffsetData offsetData = fetchedOffset.getValue(); + ListOffsetData offsetData = fetchedOffset.getValue(); ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition); resetOffsetIfNeeded(partition, requestedReset.timestamp, offsetData); } @@ -644,7 +680,7 @@ public class Fetcher implements Closeable { @Override public void onFailure(RuntimeException e) { - subscriptions.resetFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs); + subscriptions.requestFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); if (!(e instanceof RetriableException) && !cachedListOffsetsException.compareAndSet(null, e)) @@ -654,6 +690,90 @@ public class Fetcher implements Closeable { } } + /** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ + private void validateOffsetsAsync(Map partitionsToValidate) { + final Map> regrouped = + regroupFetchPositionsByLeader(partitionsToValidate); + + regrouped.forEach((node, dataMap) -> { + if (node.isEmpty()) { + metadata.requestUpdate(); + return; + } + + subscriptions.setNextAllowedRetry(dataMap.keySet(), time.milliseconds() + requestTimeoutMs); + + final Map cachedLeaderAndEpochs = partitionsToValidate.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().currentLeader)); + + RequestFuture future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate); + future.addListener(new RequestFutureListener() { + @Override + public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { + Map truncationWithoutResetPolicy = new HashMap<>(); + if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(); + } + + // For each OffsetsForLeader response, check if the end-offset is lower than our current offset + // for the partition. If so, it means we have experienced log truncation and need to reposition + // that partition's offset. + offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { + if (!subscriptions.isAssigned(respTopicPartition)) { + log.debug("Ignoring OffsetsForLeader response for partition {} which is not currently assigned.", respTopicPartition); + return; + } + + if (subscriptions.awaitingValidation(respTopicPartition)) { + SubscriptionState.FetchPosition currentPosition = subscriptions.position(respTopicPartition); + Metadata.LeaderAndEpoch currentLeader = currentPosition.currentLeader; + if (!currentLeader.equals(cachedLeaderAndEpochs.get(respTopicPartition))) { + return; + } + + if (respEndOffset.endOffset() < currentPosition.offset) { + if (subscriptions.hasDefaultOffsetResetPolicy()) { + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + respEndOffset.endOffset(), Optional.of(respEndOffset.leaderEpoch()), currentLeader); + log.info("Truncation detected for partition {}, resetting offset to {}", respTopicPartition, newPosition); + subscriptions.seek(respTopicPartition, newPosition); + } else { + log.warn("Truncation detected for partition {}, but no reset policy is set", respTopicPartition); + truncationWithoutResetPolicy.put(respTopicPartition, new OffsetAndMetadata( + respEndOffset.endOffset(), Optional.of(respEndOffset.leaderEpoch()), null)); + } + } else { + // Offset is fine, clear the validation state + subscriptions.validate(respTopicPartition); + } + } + }); + + if (!truncationWithoutResetPolicy.isEmpty()) { + throw new LogTruncationException(truncationWithoutResetPolicy); + } + } + + @Override + public void onFailure(RuntimeException e) { + subscriptions.requestFailed(dataMap.keySet(), time.milliseconds() + retryBackoffMs); + metadata.requestUpdate(); + + if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) { + log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e); + } + } + }); + }); + } + /** * Search the offsets by target times for the specified partitions. * @@ -671,7 +791,7 @@ public class Fetcher implements Closeable { return RequestFuture.failure(new StaleMetadataException()); final RequestFuture listOffsetRequestsFuture = new RequestFuture<>(); - final Map fetchedTimestampOffsets = new HashMap<>(); + final Map fetchedTimestampOffsets = new HashMap<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { @@ -712,8 +832,9 @@ public class Fetcher implements Closeable { * that need metadata update or re-connect to the leader. */ private Map> groupListOffsetRequests( - Map timestampsToSearch, Set partitionsToRetry) { - final Map> timestampsToSearchByNode = new HashMap<>(); + Map timestampsToSearch, + Set partitionsToRetry) { + final Map partitionDataMap = new HashMap<>(); for (Map.Entry entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); Optional currentInfo = metadata.partitionInfoIfCurrent(tp); @@ -735,15 +856,11 @@ public class Fetcher implements Closeable { currentInfo.get().partitionInfo().leader(), tp); partitionsToRetry.add(tp); } else { - Node node = currentInfo.get().partitionInfo().leader(); - Map topicData = - timestampsToSearchByNode.computeIfAbsent(node, n -> new HashMap<>()); - ListOffsetRequest.PartitionData partitionData = new ListOffsetRequest.PartitionData( - entry.getValue(), Optional.of(currentInfo.get().epoch())); - topicData.put(entry.getKey(), partitionData); + partitionDataMap.put(tp, + new ListOffsetRequest.PartitionData(entry.getValue(), Optional.of(currentInfo.get().epoch()))); } } - return timestampsToSearchByNode; + return regroupPartitionMapByNode(partitionDataMap); } /** @@ -788,7 +905,7 @@ public class Fetcher implements Closeable { private void handleListOffsetResponse(Map timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture future) { - Map fetchedOffsets = new HashMap<>(); + Map fetchedOffsets = new HashMap<>(); Set partitionsToRetry = new HashSet<>(); Set unauthorizedTopics = new HashSet<>(); @@ -812,7 +929,7 @@ public class Fetcher implements Closeable { log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", topicPartition, offset); if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { - OffsetData offsetData = new OffsetData(offset, null, Optional.empty()); + ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty()); fetchedOffsets.put(topicPartition, offsetData); } } else { @@ -820,7 +937,7 @@ public class Fetcher implements Closeable { log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", topicPartition, partitionData.offset, partitionData.timestamp); if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { - OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp, + ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch); fetchedOffsets.put(topicPartition, offsetData); } @@ -861,11 +978,11 @@ public class Fetcher implements Closeable { future.complete(new ListOffsetResult(fetchedOffsets, partitionsToRetry)); } - private static class ListOffsetResult { - private final Map fetchedOffsets; + static class ListOffsetResult { + private final Map fetchedOffsets; private final Set partitionsToRetry; - public ListOffsetResult(Map fetchedOffsets, Set partitionsNeedingRetry) { + public ListOffsetResult(Map fetchedOffsets, Set partitionsNeedingRetry) { this.fetchedOffsets = fetchedOffsets; this.partitionsToRetry = partitionsNeedingRetry; } @@ -878,15 +995,13 @@ public class Fetcher implements Closeable { private List fetchablePartitions() { Set exclude = new HashSet<>(); - List fetchable = subscriptions.fetchablePartitions(); if (nextInLineRecords != null && !nextInLineRecords.isFetched) { exclude.add(nextInLineRecords.partition); } for (CompletedFetch completedFetch : completedFetches) { exclude.add(completedFetch.partition); } - fetchable.removeAll(exclude); - return fetchable; + return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); } /** @@ -895,12 +1010,17 @@ public class Fetcher implements Closeable { */ private Map prepareFetchRequests() { Map fetchable = new LinkedHashMap<>(); + + // Ensure the position has an up-to-date leader + subscriptions.assignedPartitions().forEach( + tp -> subscriptions.maybeValidatePosition(tp, metadata.leaderAndEpoch(tp))); + for (TopicPartition partition : fetchablePartitions()) { - Node node = metadata.partitionInfoIfCurrent(partition) - .map(MetadataCache.PartitionInfoAndEpoch::partitionInfo) - .map(PartitionInfo::leader) - .orElse(null); - if (node == null) { + SubscriptionState.FetchPosition position = this.subscriptions.position(partition); + Metadata.LeaderAndEpoch leaderAndEpoch = position.currentLeader; + Node node = leaderAndEpoch.leader; + + if (node == null || node.isEmpty()) { metadata.requestUpdate(); } else if (client.isUnavailable(node)) { client.maybeThrowAuthFailure(node); @@ -912,26 +1032,26 @@ public class Fetcher implements Closeable { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } else { // if there is a leader and no in-flight requests, issue a new fetch - FetchSessionHandler.Builder builder = fetchable.get(node); + FetchSessionHandler.Builder builder = fetchable.get(leaderAndEpoch.leader); if (builder == null) { - FetchSessionHandler handler = sessionHandler(node.id()); + int id = leaderAndEpoch.leader.id(); + FetchSessionHandler handler = sessionHandler(id); if (handler == null) { - handler = new FetchSessionHandler(logContext, node.id()); - sessionHandlers.put(node.id(), handler); + handler = new FetchSessionHandler(logContext, id); + sessionHandlers.put(id, handler); } builder = handler.newBuilder(); - fetchable.put(node, builder); + fetchable.put(leaderAndEpoch.leader, builder); } - long position = this.subscriptions.position(partition); - Optional leaderEpoch = this.metadata.lastSeenLeaderEpoch(partition); - builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, - this.fetchSize, leaderEpoch)); + builder.add(partition, new FetchRequest.PartitionData(position.offset, + FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, leaderAndEpoch.epoch)); - log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel, - partition, position, node); + log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel, + partition, position, leaderAndEpoch.leader); } } + Map reqs = new LinkedHashMap<>(); for (Map.Entry entry : fetchable.entrySet()) { reqs.put(entry.getKey(), entry.getValue().build()); @@ -939,6 +1059,21 @@ public class Fetcher implements Closeable { return reqs; } + private Map> regroupFetchPositionsByLeader( + Map partitionMap) { + return partitionMap.entrySet() + .stream() + .collect(Collectors.groupingBy(entry -> entry.getValue().currentLeader.leader, + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + private Map> regroupPartitionMapByNode(Map partitionMap) { + return partitionMap.entrySet() + .stream() + .collect(Collectors.groupingBy(entry -> metadata.fetch().leaderFor(entry.getKey()), + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + /** * The callback for fetch completion */ @@ -957,8 +1092,8 @@ public class Fetcher implements Closeable { } else if (error == Errors.NONE) { // we are interested in this fetch only if the beginning offset matches the // current consumed position - Long position = subscriptions.position(tp); - if (position == null || position != fetchOffset) { + SubscriptionState.FetchPosition position = subscriptions.position(tp); + if (position == null || position.offset != fetchOffset) { log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + "the expected offset {}", tp, fetchOffset, position); return null; @@ -1011,7 +1146,7 @@ public class Fetcher implements Closeable { log.warn("Received unknown topic or partition error in fetch for partition {}", tp); this.metadata.requestUpdate(); } else if (error == Errors.OFFSET_OUT_OF_RANGE) { - if (fetchOffset != subscriptions.position(tp)) { + if (fetchOffset != subscriptions.position(tp).offset) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); } else if (subscriptions.hasDefaultOffsetResetPolicy()) { @@ -1138,6 +1273,7 @@ public class Fetcher implements Closeable { private Record lastRecord; private CloseableIterator records; private long nextFetchOffset; + private Optional lastEpoch; private boolean isFetched = false; private Exception cachedRecordException = null; private boolean corruptLastRecord = false; @@ -1149,6 +1285,7 @@ public class Fetcher implements Closeable { this.completedFetch = completedFetch; this.batches = batches; this.nextFetchOffset = completedFetch.fetchedOffset; + this.lastEpoch = Optional.empty(); this.abortedProducerIds = new HashSet<>(); this.abortedTransactions = abortedTransactions(completedFetch.partitionData); } @@ -1214,6 +1351,9 @@ public class Fetcher implements Closeable { } currentBatch = batches.next(); + lastEpoch = currentBatch.partitionLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? + Optional.empty() : Optional.of(currentBatch.partitionLeaderEpoch()); + maybeEnsureValid(currentBatch); if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java new file mode 100644 index 00000000000..9ffedd1947b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.EpochEndOffset; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.utils.LogContext; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Convenience class for making asynchronous requests to the OffsetsForLeaderEpoch API + */ +public class OffsetsForLeaderEpochClient extends AsyncClient< + Map, + OffsetsForLeaderEpochRequest, + OffsetsForLeaderEpochResponse, + OffsetsForLeaderEpochClient.OffsetForEpochResult> { + + OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext logContext) { + super(client, logContext); + } + + @Override + protected AbstractRequest.Builder prepareRequest( + Node node, Map requestData) { + Map partitionData = new HashMap<>(requestData.size()); + requestData.forEach((topicPartition, fetchPosition) -> fetchPosition.offsetEpoch.ifPresent( + fetchEpoch -> partitionData.put(topicPartition, + new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch)))); + + return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData); + } + + @Override + protected OffsetForEpochResult handleResponse( + Node node, + Map requestData, + OffsetsForLeaderEpochResponse response) { + + Set partitionsToRetry = new HashSet<>(); + Set unauthorizedTopics = new HashSet<>(); + Map endOffsets = new HashMap<>(); + + for (TopicPartition topicPartition : requestData.keySet()) { + EpochEndOffset epochEndOffset = response.responses().get(topicPartition); + if (epochEndOffset == null) { + logger().warn("Missing partition {} from response, ignoring", topicPartition); + partitionsToRetry.add(topicPartition); + continue; + } + Errors error = epochEndOffset.error(); + if (error == Errors.NONE) { + logger().debug("Handling OffsetsForLeaderEpoch response for {}. Got offset {} for epoch {}", + topicPartition, epochEndOffset.endOffset(), epochEndOffset.leaderEpoch()); + endOffsets.put(topicPartition, epochEndOffset); + } else if (error == Errors.NOT_LEADER_FOR_PARTITION || + error == Errors.REPLICA_NOT_AVAILABLE || + error == Errors.KAFKA_STORAGE_ERROR || + error == Errors.OFFSET_NOT_AVAILABLE || + error == Errors.LEADER_NOT_AVAILABLE) { + logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", + topicPartition, error); + partitionsToRetry.add(topicPartition); + } else if (error == Errors.FENCED_LEADER_EPOCH || + error == Errors.UNKNOWN_LEADER_EPOCH) { + logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", + topicPartition, error); + partitionsToRetry.add(topicPartition); + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + logger().warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition); + partitionsToRetry.add(topicPartition); + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + unauthorizedTopics.add(topicPartition.topic()); + } else { + logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message()); + partitionsToRetry.add(topicPartition); + } + } + + if (!unauthorizedTopics.isEmpty()) + throw new TopicAuthorizationException(unauthorizedTopics); + else + return new OffsetForEpochResult(endOffsets, partitionsToRetry); + } + + public static class OffsetForEpochResult { + private final Map endOffsets; + private final Set partitionsToRetry; + + OffsetForEpochResult(Map endOffsets, Set partitionsNeedingRetry) { + this.endOffsets = endOffsets; + this.partitionsToRetry = partitionsNeedingRetry; + } + + public Map endOffsets() { + return endOffsets; + } + + public Set partitionsToRetry() { + return partitionsToRetry; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index c28ac87afb1..39094216852 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -16,22 +16,27 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; @@ -45,7 +50,7 @@ import java.util.stream.Collectors; * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription). * * Once assigned, the partition is not considered "fetchable" until its initial position has - * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch + * been set with {@link #seek(TopicPartition, FetchPosition)}. Fetchable partitions track a fetch * position which is used to set the offset of the next fetch, and a consumed position * which is the last offset that has been returned to the user. You can suspend fetching * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed @@ -326,8 +331,16 @@ public class SubscriptionState { return state; } + public void seek(TopicPartition tp, FetchPosition position) { + assignedState(tp).seek(position); + } + + public void seekAndValidate(TopicPartition tp, FetchPosition position) { + assignedState(tp).seekAndValidate(position); + } + public void seek(TopicPartition tp, long offset) { - assignedState(tp).seek(offset); + seek(tp, new FetchPosition(offset, Optional.empty(), new Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty()))); } /** @@ -345,33 +358,52 @@ public class SubscriptionState { return this.assignment.size(); } - public List fetchablePartitions() { - return collectPartitions(TopicPartitionState::isFetchable, Collectors.toList()); + public List fetchablePartitions(Predicate isAvailable) { + return assignment.stream() + .filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable()) + .map(PartitionStates.PartitionState::topicPartition) + .collect(Collectors.toList()); } public boolean partitionsAutoAssigned() { return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN; } - public void position(TopicPartition tp, long offset) { - assignedState(tp).position(offset); + public void position(TopicPartition tp, FetchPosition position) { + assignedState(tp).position(position); } - public Long position(TopicPartition tp) { - return assignedState(tp).position; + public boolean maybeValidatePosition(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { + return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + } + + public boolean awaitingValidation(TopicPartition tp) { + return assignedState(tp).awaitingValidation(); + } + + public void validate(TopicPartition tp) { + assignedState(tp).validate(); + } + + public FetchPosition validPosition(TopicPartition tp) { + return assignedState(tp).validPosition(); + } + + public FetchPosition position(TopicPartition tp) { + return assignedState(tp).position(); } public Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { TopicPartitionState topicPartitionState = assignedState(tp); if (isolationLevel == IsolationLevel.READ_COMMITTED) - return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position; + return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; else - return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position; + return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset; } public Long partitionLead(TopicPartition tp) { TopicPartitionState topicPartitionState = assignedState(tp); - return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset; + return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position.offset - topicPartitionState.logStartOffset; } public void updateHighWatermark(TopicPartition tp, long highWatermark) { @@ -389,8 +421,10 @@ public class SubscriptionState { public Map allConsumed() { Map allConsumed = new HashMap<>(); assignment.stream().forEach(state -> { - if (state.value().hasValidPosition()) - allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position)); + TopicPartitionState partitionState = state.value(); + if (partitionState.hasValidPosition()) + allConsumed.put(state.topicPartition(), new OffsetAndMetadata(partitionState.position.offset, + partitionState.position.offsetEpoch, "")); }); return allConsumed; } @@ -403,9 +437,9 @@ public class SubscriptionState { requestOffsetReset(partition, defaultResetStrategy); } - public void setResetPending(Set partitions, long nextAllowResetTimeMs) { + public void setNextAllowedRetry(Set partitions, long nextAllowResetTimeMs) { for (TopicPartition partition : partitions) { - assignedState(partition).setResetPending(nextAllowResetTimeMs); + assignedState(partition).setNextAllowedRetry(nextAllowResetTimeMs); } } @@ -426,7 +460,7 @@ public class SubscriptionState { } public Set missingFetchPositions() { - return collectPartitions(TopicPartitionState::isMissingPosition, Collectors.toSet()); + return collectPartitions(state -> !state.hasPosition(), Collectors.toSet()); } private > T collectPartitions(Predicate filter, Collector collector) { @@ -436,12 +470,13 @@ public class SubscriptionState { .collect(collector); } + public void resetMissingPositions() { final Set partitionsWithNoOffsets = new HashSet<>(); assignment.stream().forEach(state -> { TopicPartition tp = state.topicPartition(); TopicPartitionState partitionState = state.value(); - if (partitionState.isMissingPosition()) { + if (!partitionState.hasPosition()) { if (defaultResetStrategy == OffsetResetStrategy.NONE) partitionsWithNoOffsets.add(tp); else @@ -454,7 +489,12 @@ public class SubscriptionState { } public Set partitionsNeedingReset(long nowMs) { - return collectPartitions(state -> state.awaitingReset() && state.isResetAllowed(nowMs), + return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs), + Collectors.toSet()); + } + + public Set partitionsNeedingValidation(long nowMs) { + return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs), Collectors.toSet()); } @@ -482,9 +522,9 @@ public class SubscriptionState { assignedState(tp).resume(); } - public void resetFailed(Set partitions, long nextRetryTimeMs) { + public void requestFailed(Set partitions, long nextRetryTimeMs) { for (TopicPartition partition : partitions) - assignedState(partition).resetFailed(nextRetryTimeMs); + assignedState(partition).requestFailed(nextRetryTimeMs); } public void movePartitionToEnd(TopicPartition tp) { @@ -503,68 +543,148 @@ public class SubscriptionState { } private static class TopicPartitionState { - private Long position; // last consumed position + + private FetchState fetchState; + private FetchPosition position; // last consumed position + private Long highWatermark; // the high watermark from last fetch private Long logStartOffset; // the log start offset private Long lastStableOffset; private boolean paused; // whether this partition has been paused by the user private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting - private Long nextAllowedRetryTimeMs; + private Long nextRetryTimeMs; + TopicPartitionState() { this.paused = false; + this.fetchState = FetchStates.INITIALIZING; this.position = null; this.highWatermark = null; this.logStartOffset = null; this.lastStableOffset = null; this.resetStrategy = null; - this.nextAllowedRetryTimeMs = null; + this.nextRetryTimeMs = null; + } + + private void transitionState(FetchState newState, Runnable runIfTransitioned) { + FetchState nextState = this.fetchState.transitionTo(newState); + if (nextState.equals(newState)) { + this.fetchState = nextState; + runIfTransitioned.run(); + } } private void reset(OffsetResetStrategy strategy) { - this.resetStrategy = strategy; - this.position = null; - this.nextAllowedRetryTimeMs = null; + transitionState(FetchStates.AWAIT_RESET, () -> { + this.resetStrategy = strategy; + this.nextRetryTimeMs = null; + }); } - private boolean isResetAllowed(long nowMs) { - return nextAllowedRetryTimeMs == null || nowMs >= nextAllowedRetryTimeMs; + private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEpoch) { + if (this.fetchState.equals(FetchStates.AWAIT_RESET)) { + return false; + } + + if (currentLeaderAndEpoch.equals(Metadata.LeaderAndEpoch.noLeaderOrEpoch())) { + // Ignore empty LeaderAndEpochs + return false; + } + + if (position != null && !position.safeToFetchFrom(currentLeaderAndEpoch)) { + FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch); + validatePosition(newPosition); + } + return this.fetchState.equals(FetchStates.AWAIT_VALIDATION); + } + + private void validatePosition(FetchPosition position) { + if (position.offsetEpoch.isPresent()) { + transitionState(FetchStates.AWAIT_VALIDATION, () -> { + this.position = position; + this.nextRetryTimeMs = null; + }); + } else { + // If we have no epoch information for the current position, then we can skip validation + transitionState(FetchStates.FETCHING, () -> { + this.position = position; + this.nextRetryTimeMs = null; + }); + } + } + + /** + * Clear the awaiting validation state and enter fetching. + */ + private void validate() { + if (hasPosition()) { + transitionState(FetchStates.FETCHING, () -> { + this.nextRetryTimeMs = null; + }); + } + } + + private boolean awaitingValidation() { + return fetchState.equals(FetchStates.AWAIT_VALIDATION); + } + + private boolean awaitingRetryBackoff(long nowMs) { + return nextRetryTimeMs != null && nowMs < nextRetryTimeMs; } private boolean awaitingReset() { - return resetStrategy != null; + return fetchState.equals(FetchStates.AWAIT_RESET); } - private void setResetPending(long nextAllowedRetryTimeMs) { - this.nextAllowedRetryTimeMs = nextAllowedRetryTimeMs; + private void setNextAllowedRetry(long nextAllowedRetryTimeMs) { + this.nextRetryTimeMs = nextAllowedRetryTimeMs; } - private void resetFailed(long nextAllowedRetryTimeMs) { - this.nextAllowedRetryTimeMs = nextAllowedRetryTimeMs; + private void requestFailed(long nextAllowedRetryTimeMs) { + this.nextRetryTimeMs = nextAllowedRetryTimeMs; } private boolean hasValidPosition() { - return position != null; + return fetchState.hasValidPosition(); } - private boolean isMissingPosition() { - return !hasValidPosition() && !awaitingReset(); + private boolean hasPosition() { + return fetchState.hasPosition(); } private boolean isPaused() { return paused; } - private void seek(long offset) { - this.position = offset; - this.resetStrategy = null; - this.nextAllowedRetryTimeMs = null; + private void seek(FetchPosition position) { + transitionState(FetchStates.FETCHING, () -> { + this.position = position; + this.resetStrategy = null; + this.nextRetryTimeMs = null; + }); } - private void position(long offset) { + private void seekAndValidate(FetchPosition fetchPosition) { + seek(fetchPosition); + validatePosition(fetchPosition); + } + + private void position(FetchPosition position) { if (!hasValidPosition()) throw new IllegalStateException("Cannot set a new position without a valid current position"); - this.position = offset; + this.position = position; + } + + private FetchPosition validPosition() { + if (hasValidPosition()) { + return position; + } else { + return null; + } + } + + private FetchPosition position() { + return position; } private void pause() { @@ -581,5 +701,149 @@ public class SubscriptionState { } + /** + * The fetch state of a partition. This class is used to determine valid state transitions and expose the some of + * the behavior of the current fetch state. Actual state variables are stored in the {@link TopicPartitionState}. + */ + interface FetchState { + default FetchState transitionTo(FetchState newState) { + if (validTransitions().contains(newState)) { + return newState; + } else { + return this; + } + } + Collection validTransitions(); + + boolean hasPosition(); + + boolean hasValidPosition(); + } + + /** + * An enumeration of all the possible fetch states. The state transitions are encoded in the values returned by + * {@link FetchState#validTransitions}. + */ + enum FetchStates implements FetchState { + INITIALIZING() { + @Override + public Collection validTransitions() { + return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET, FetchStates.AWAIT_VALIDATION); + } + + @Override + public boolean hasPosition() { + return false; + } + + @Override + public boolean hasValidPosition() { + return false; + } + }, + + FETCHING() { + @Override + public Collection validTransitions() { + return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET, FetchStates.AWAIT_VALIDATION); + } + + @Override + public boolean hasPosition() { + return true; + } + + @Override + public boolean hasValidPosition() { + return true; + } + }, + + AWAIT_RESET() { + @Override + public Collection validTransitions() { + return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET); + } + + @Override + public boolean hasPosition() { + return true; + } + + @Override + public boolean hasValidPosition() { + return false; + } + }, + + AWAIT_VALIDATION() { + @Override + public Collection validTransitions() { + return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET, FetchStates.AWAIT_VALIDATION); + } + + @Override + public boolean hasPosition() { + return true; + } + + @Override + public boolean hasValidPosition() { + return false; + } + } + } + + /** + * Represents the position of a partition subscription. + * + * This includes the offset and epoch from the last record in + * the batch from a FetchResponse. It also includes the leader epoch at the time the batch was consumed. + * + * The last fetch epoch is used to + */ + public static class FetchPosition { + public final long offset; + public final Optional offsetEpoch; + public final Metadata.LeaderAndEpoch currentLeader; + + public FetchPosition(long offset, Optional offsetEpoch, Metadata.LeaderAndEpoch currentLeader) { + this.offset = offset; + this.offsetEpoch = Objects.requireNonNull(offsetEpoch); + this.currentLeader = Objects.requireNonNull(currentLeader); + } + + /** + * Test if it is "safe" to fetch from a given leader and epoch. This effectively is testing if + * {@link Metadata.LeaderAndEpoch} known to the subscription is equal to the one supplied by the caller. + */ + public boolean safeToFetchFrom(Metadata.LeaderAndEpoch leaderAndEpoch) { + return !currentLeader.leader.isEmpty() && currentLeader.equals(leaderAndEpoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FetchPosition that = (FetchPosition) o; + return offset == that.offset && + offsetEpoch.equals(that.offsetEpoch) && + currentLeader.equals(that.currentLeader); + } + + @Override + public int hashCode() { + return Objects.hash(offset, offsetEpoch, currentLeader); + } + + @Override + public String toString() { + return "FetchPosition{" + + "offset=" + offset + + ", offsetEpoch=" + offsetEpoch + + ", currentLeader=" + currentLeader + + '}'; + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 5d2a5cf63b7..b5f7ab2b9c8 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1038,4 +1038,16 @@ public final class Utils { } return result; } + + public static Map transformMap( + Map map, + Function keyMapper, + Function valueMapper) { + return map.entrySet().stream().collect( + Collectors.toMap( + entry -> keyMapper.apply(entry.getKey()), + entry -> valueMapper.apply(entry.getValue()) + ) + ); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 39e8c3d4aa7..0e3d1912bfb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -35,7 +35,6 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -205,9 +204,7 @@ public class MetadataTest { // First epoch seen, accept it { - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(100), replicas, isr, offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100); metadata.update(metadataResponse, 10L); assertNotNull(metadata.fetch().partition(tp)); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -215,9 +212,9 @@ public class MetadataTest { // Fake an empty ISR, but with an older epoch, should reject it { - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99, (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(99), replicas, Collections.emptyList(), offlineReplicas)); + new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas)); metadata.update(metadataResponse, 20L); assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -225,9 +222,9 @@ public class MetadataTest { // Fake an empty ISR, with same epoch, accept it { - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100, (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(100), replicas, Collections.emptyList(), offlineReplicas)); + new MetadataResponse.PartitionMetadata(error, partition, leader, leaderEpoch, replicas, Collections.emptyList(), offlineReplicas)); metadata.update(metadataResponse, 20L); assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -235,8 +232,7 @@ public class MetadataTest { // Empty metadata response, should not keep old partition but should keep the last-seen epoch { - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith( - "dummy", 1, Collections.emptyMap(), Collections.emptyMap(), MetadataResponse.PartitionMetadata::new); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.emptyMap()); metadata.update(metadataResponse, 20L); assertNull(metadata.fetch().partition(tp)); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -244,9 +240,7 @@ public class MetadataTest { // Back in the metadata, with old epoch, should not get added { - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(99), replicas, isr, offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99); metadata.update(metadataResponse, 10L); assertNull(metadata.fetch().partition(tp)); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -284,9 +278,7 @@ public class MetadataTest { assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99)); // Update epoch to 100 - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(100), replicas, isr, offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100); metadata.update(metadataResponse, 10L); assertNotNull(metadata.fetch().partition(tp)); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); @@ -308,9 +300,7 @@ public class MetadataTest { assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101); // Metadata with equal or newer epoch is accepted - metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(101), replicas, isr, offlineReplicas)); + metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 101); metadata.update(metadataResponse, 30L); assertNotNull(metadata.fetch().partition(tp)); assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5); @@ -321,9 +311,7 @@ public class MetadataTest { @Test public void testNoEpoch() { metadata.update(emptyMetadataResponse(), 0L); - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.empty(), replicas, isr, offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1)); metadata.update(metadataResponse, 10L); TopicPartition tp = new TopicPartition("topic-1", 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e17a6efcd9f..f31ca525312 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -638,6 +638,26 @@ public class KafkaConsumerTest { assertEquals(50L, consumer.position(tp0)); } + @Test + public void testOffsetIsValidAfterSeek() { + Time time = new MockTime(); + SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + + initMetadata(client, Collections.singletonMap(topic, 1)); + Node node = metadata.fetch().nodes().get(0); + + PartitionAssignor assignor = new RoundRobinAssignor(); + + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, + true, groupId); + consumer.assign(singletonList(tp0)); + consumer.seek(tp0, 20L); + consumer.poll(Duration.ZERO); + assertEquals(subscription.validPosition(tp0).offset, 20L); + } + @Test public void testCommitsFetchedDuringAssign() { long offset1 = 10000; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 49bbcec206b..3cdbbfae004 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1722,7 +1722,31 @@ public class ConsumerCoordinatorTest { assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); - assertEquals(100L, subscriptions.position(t1p).longValue()); + assertEquals(100L, subscriptions.position(t1p).offset); + } + + @Test + public void testRefreshOffsetWithValidation() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + subscriptions.assignFromUser(singleton(t1p)); + + // Initial leader epoch of 4 + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1, + Collections.emptyMap(), singletonMap(topic1, 1), tp -> 4); + client.updateMetadata(metadataResponse); + + // Load offsets from previous epoch + client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L, 3)); + coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)); + + // Offset gets loaded, but requires validation + assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); + assertFalse(subscriptions.hasAllFetchPositions()); + assertTrue(subscriptions.awaitingValidation(t1p)); + assertEquals(subscriptions.position(t1p).offset, 100L); + assertNull(subscriptions.validPosition(t1p)); } @Test @@ -1756,7 +1780,7 @@ public class ConsumerCoordinatorTest { assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); - assertEquals(100L, subscriptions.position(t1p).longValue()); + assertEquals(100L, subscriptions.position(t1p).offset); } @Test @@ -1797,7 +1821,7 @@ public class ConsumerCoordinatorTest { assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); - assertEquals(100L, subscriptions.position(t1p).longValue()); + assertEquals(100L, subscriptions.position(t1p).offset); } @Test @@ -1825,7 +1849,7 @@ public class ConsumerCoordinatorTest { assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); - assertEquals(500L, subscriptions.position(t1p).longValue()); + assertEquals(500L, subscriptions.position(t1p).offset); assertTrue(coordinator.coordinatorUnknown()); } @@ -2023,7 +2047,7 @@ public class ConsumerCoordinatorTest { time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); assertFalse(coordinator.coordinatorUnknown()); - assertEquals(100L, subscriptions.position(t1p).longValue()); + assertEquals(100L, subscriptions.position(t1p).offset); } private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement, @@ -2208,6 +2232,12 @@ public class ConsumerCoordinatorTest { return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data)); } + private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset, int epoch) { + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, + Optional.of(epoch), metadata, partitionLevelError); + return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data)); + } + private OffsetCommitCallback callback(final AtomicBoolean success) { return new OffsetCommitCallback() { @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 68a467f40d3..6a0a4f3c2ee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; @@ -66,6 +67,7 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.IsolationLevel; @@ -73,6 +75,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -115,6 +118,7 @@ import java.util.stream.Collectors; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.test.TestUtils.assertOptional; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -206,7 +210,7 @@ public class FetcherTest { List> records = partitionRecords.get(tp0); assertEquals(3, records.size()); - assertEquals(4L, subscriptions.position(tp0).longValue()); // this is the next fetching position + assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position long offset = 1; for (ConsumerRecord record : records) { assertEquals(offset, record.offset()); @@ -370,7 +374,7 @@ public class FetcherTest { List> records = partitionRecords.get(tp0); assertEquals(1, records.size()); - assertEquals(2L, subscriptions.position(tp0).longValue()); + assertEquals(2L, subscriptions.position(tp0).offset); ConsumerRecord record = records.get(0); assertArrayEquals("key".getBytes(), record.key()); @@ -438,7 +442,7 @@ public class FetcherTest { fail("fetchedRecords should have raised"); } catch (SerializationException e) { // the position should not advance since no data has been returned - assertEquals(1, subscriptions.position(tp0).longValue()); + assertEquals(1, subscriptions.position(tp0).offset); } } } @@ -496,7 +500,7 @@ public class FetcherTest { // the first fetchedRecords() should return the first valid message assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); - assertEquals(1, subscriptions.position(tp0).longValue()); + assertEquals(1, subscriptions.position(tp0).offset); ensureBlockOnRecord(1L); seekAndConsumeRecord(buffer, 2L); @@ -518,7 +522,7 @@ public class FetcherTest { fetcher.fetchedRecords(); fail("fetchedRecords should have raised KafkaException"); } catch (KafkaException e) { - assertEquals(blockedOffset, subscriptions.position(tp0).longValue()); + assertEquals(blockedOffset, subscriptions.position(tp0).offset); } } } @@ -536,7 +540,7 @@ public class FetcherTest { List> records = recordsByPartition.get(tp0); assertEquals(1, records.size()); assertEquals(toOffset, records.get(0).offset()); - assertEquals(toOffset + 1, subscriptions.position(tp0).longValue()); + assertEquals(toOffset + 1, subscriptions.position(tp0).offset); } @Test @@ -574,7 +578,7 @@ public class FetcherTest { fetcher.fetchedRecords(); fail("fetchedRecords should have raised KafkaException"); } catch (KafkaException e) { - assertEquals(0, subscriptions.position(tp0).longValue()); + assertEquals(0, subscriptions.position(tp0).offset); } } } @@ -604,7 +608,7 @@ public class FetcherTest { fail("fetchedRecords should have raised"); } catch (KafkaException e) { // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp0).longValue()); + assertEquals(0, subscriptions.position(tp0).offset); } } @@ -669,7 +673,7 @@ public class FetcherTest { Map>> recordsByPartition = fetchedRecords(); records = recordsByPartition.get(tp0); assertEquals(2, records.size()); - assertEquals(3L, subscriptions.position(tp0).longValue()); + assertEquals(3L, subscriptions.position(tp0).offset); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); @@ -678,7 +682,7 @@ public class FetcherTest { recordsByPartition = fetchedRecords(); records = recordsByPartition.get(tp0); assertEquals(1, records.size()); - assertEquals(4L, subscriptions.position(tp0).longValue()); + assertEquals(4L, subscriptions.position(tp0).offset); assertEquals(3, records.get(0).offset()); assertTrue(fetcher.sendFetches() > 0); @@ -686,7 +690,7 @@ public class FetcherTest { recordsByPartition = fetchedRecords(); records = recordsByPartition.get(tp0); assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp0).longValue()); + assertEquals(6L, subscriptions.position(tp0).offset); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -712,7 +716,7 @@ public class FetcherTest { Map>> recordsByPartition = fetchedRecords(); records = recordsByPartition.get(tp0); assertEquals(2, records.size()); - assertEquals(3L, subscriptions.position(tp0).longValue()); + assertEquals(3L, subscriptions.position(tp0).offset); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); @@ -726,7 +730,7 @@ public class FetcherTest { assertNull(fetchedRecords.get(tp0)); records = fetchedRecords.get(tp1); assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp1).longValue()); + assertEquals(6L, subscriptions.position(tp1).offset); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -755,7 +759,7 @@ public class FetcherTest { Map>> recordsByPartition = fetchedRecords(); consumerRecords = recordsByPartition.get(tp0); assertEquals(3, consumerRecords.size()); - assertEquals(31L, subscriptions.position(tp0).longValue()); // this is the next fetching position + assertEquals(31L, subscriptions.position(tp0).offset); // this is the next fetching position assertEquals(15L, consumerRecords.get(0).offset()); assertEquals(20L, consumerRecords.get(1).offset()); @@ -780,7 +784,7 @@ public class FetcherTest { } catch (RecordTooLargeException e) { assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: ")); // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp0).longValue()); + assertEquals(0, subscriptions.position(tp0).offset); } } finally { client.setNodeApiVersions(NodeApiVersions.create()); @@ -803,7 +807,7 @@ public class FetcherTest { } catch (KafkaException e) { assertTrue(e.getMessage().startsWith("Failed to make progress reading messages")); // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp0).longValue()); + assertEquals(0, subscriptions.position(tp0).offset); } } @@ -944,15 +948,11 @@ public class FetcherTest { public void testEpochSetInFetchRequest() { buildFetcher(); subscriptions.assignFromUser(singleton(tp0)); - client.updateMetadata(initialUpdateResponse); - - // Metadata update with leader epochs - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap(topicName, 4), - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(99), replicas, Collections.emptyList(), offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, + Collections.emptyMap(), Collections.singletonMap(topicName, 4), tp -> 99); client.updateMetadata(metadataResponse); - subscriptions.seek(tp0, 0); + subscriptions.seek(tp0, 10); assertEquals(1, fetcher.sendFetches()); // Check for epoch in outgoing request @@ -961,7 +961,7 @@ public class FetcherTest { FetchRequest fetchRequest = (FetchRequest) body; fetchRequest.fetchData().values().forEach(partitionData -> { assertTrue("Expected Fetcher to set leader epoch in request", partitionData.currentLeaderEpoch.isPresent()); - assertEquals("Expected leader epoch to match epoch from metadata update", partitionData.currentLeaderEpoch.get().longValue(), 99); + assertEquals("Expected leader epoch to match epoch from metadata update", 99, partitionData.currentLeaderEpoch.get().longValue()); }); return true; } else { @@ -984,7 +984,8 @@ public class FetcherTest { consumerClient.poll(time.timer(0)); assertEquals(0, fetcher.fetchedRecords().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(null, subscriptions.position(tp0)); + assertNull(subscriptions.validPosition(tp0)); + assertNotNull(subscriptions.position(tp0)); } @Test @@ -1001,7 +1002,7 @@ public class FetcherTest { consumerClient.poll(time.timer(0)); assertEquals(0, fetcher.fetchedRecords().size()); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(1, subscriptions.position(tp0).longValue()); + assertEquals(1, subscriptions.position(tp0).offset); } @Test @@ -1065,8 +1066,8 @@ public class FetcherTest { List> allFetchedRecords = new ArrayList<>(); fetchRecordsInto(allFetchedRecords); - assertEquals(1, subscriptions.position(tp0).longValue()); - assertEquals(4, subscriptions.position(tp1).longValue()); + assertEquals(1, subscriptions.position(tp0).offset); + assertEquals(4, subscriptions.position(tp1).offset); assertEquals(3, allFetchedRecords.size()); OffsetOutOfRangeException e = assertThrows(OffsetOutOfRangeException.class, () -> @@ -1075,8 +1076,8 @@ public class FetcherTest { assertEquals(singleton(tp0), e.offsetOutOfRangePartitions().keySet()); assertEquals(1L, e.offsetOutOfRangePartitions().get(tp0).longValue()); - assertEquals(1, subscriptions.position(tp0).longValue()); - assertEquals(4, subscriptions.position(tp1).longValue()); + assertEquals(1, subscriptions.position(tp0).offset); + assertEquals(4, subscriptions.position(tp1).offset); assertEquals(3, allFetchedRecords.size()); } @@ -1118,8 +1119,8 @@ public class FetcherTest { for (List> records : recordsByPartition.values()) fetchedRecords.addAll(records); - assertEquals(fetchedRecords.size(), subscriptions.position(tp1) - 1); - assertEquals(4, subscriptions.position(tp1).longValue()); + assertEquals(fetchedRecords.size(), subscriptions.position(tp1).offset - 1); + assertEquals(4, subscriptions.position(tp1).offset); assertEquals(3, fetchedRecords.size()); List oorExceptions = new ArrayList<>(); @@ -1142,7 +1143,7 @@ public class FetcherTest { fetchedRecords.addAll(records); // Should not have received an Exception for tp2. - assertEquals(6, subscriptions.position(tp2).longValue()); + assertEquals(6, subscriptions.position(tp2).offset); assertEquals(5, fetchedRecords.size()); int numExceptionsExpected = 3; @@ -1206,7 +1207,7 @@ public class FetcherTest { // disconnects should have no affect on subscription state assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(0, subscriptions.position(tp0).longValue()); + assertEquals(0, subscriptions.position(tp0).offset); } @Test @@ -1218,7 +1219,7 @@ public class FetcherTest { fetcher.resetOffsetsIfNeeded(); assertFalse(client.hasInFlightRequests()); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1233,7 +1234,7 @@ public class FetcherTest { consumerClient.pollNoWakeup(); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1250,7 +1251,7 @@ public class FetcherTest { consumerClient.pollNoWakeup(); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } /** @@ -1290,7 +1291,7 @@ public class FetcherTest { assertTrue(subscriptions.hasValidPosition(tp0)); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(subscriptions.position(tp0).longValue(), 5L); + assertEquals(subscriptions.position(tp0).offset, 5L); } @Test @@ -1319,7 +1320,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1346,7 +1347,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1362,7 +1363,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1392,7 +1393,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1428,7 +1429,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1476,7 +1477,7 @@ public class FetcherTest { assertFalse(client.hasPendingResponses()); assertFalse(client.hasInFlightRequests()); - assertEquals(237L, subscriptions.position(tp0).longValue()); + assertEquals(237L, subscriptions.position(tp0).offset); } @Test @@ -1524,7 +1525,7 @@ public class FetcherTest { assertFalse(client.hasInFlightRequests()); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); - assertEquals(5L, subscriptions.position(tp0).longValue()); + assertEquals(5L, subscriptions.position(tp0).offset); } @Test @@ -1562,7 +1563,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertTrue(subscriptions.isFetchable(tp0)); - assertEquals(5, subscriptions.position(tp0).longValue()); + assertEquals(5, subscriptions.position(tp0).offset); } @Test @@ -1580,7 +1581,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp0)); - assertEquals(10, subscriptions.position(tp0).longValue()); + assertEquals(10, subscriptions.position(tp0).offset); } @Test @@ -1610,7 +1611,7 @@ public class FetcherTest { assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused assertTrue(subscriptions.hasValidPosition(tp0)); - assertEquals(10, subscriptions.position(tp0).longValue()); + assertEquals(10, subscriptions.position(tp0).offset); } @Test @@ -2197,9 +2198,8 @@ public class FetcherTest { client.updateMetadata(initialUpdateResponse); // Metadata update with leader epochs - MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap(topicName, 4), - (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) -> - new MetadataResponse.PartitionMetadata(error, partition, leader, Optional.of(99), replicas, Collections.emptyList(), offlineReplicas)); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, + Collections.emptyMap(), Collections.singletonMap(topicName, 4), tp -> 99); client.updateMetadata(metadataResponse); // Request latest offset @@ -2571,7 +2571,7 @@ public class FetcherTest { } // The next offset should point to the next batch - assertEquals(4L, subscriptions.position(tp0).longValue()); + assertEquals(4L, subscriptions.position(tp0).offset); } @Test @@ -2602,7 +2602,7 @@ public class FetcherTest { assertTrue(allFetchedRecords.isEmpty()); // The next offset should point to the next batch - assertEquals(lastOffset + 1, subscriptions.position(tp0).longValue()); + assertEquals(lastOffset + 1, subscriptions.position(tp0).offset); } @Test @@ -2734,7 +2734,7 @@ public class FetcherTest { // Ensure that we don't return any of the aborted records, but yet advance the consumer position. assertFalse(fetchedRecords.containsKey(tp0)); - assertEquals(currentOffset, (long) subscriptions.position(tp0)); + assertEquals(currentOffset, subscriptions.position(tp0).offset); } @Test @@ -2743,8 +2743,8 @@ public class FetcherTest { List> records; assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); - subscriptions.seek(tp0, 0); - subscriptions.seek(tp1, 1); + subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seek(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); // Fetch some records and establish an incremental fetch session. LinkedHashMap> partitions1 = new LinkedHashMap<>(); @@ -2762,8 +2762,8 @@ public class FetcherTest { assertFalse(fetchedRecords.containsKey(tp1)); records = fetchedRecords.get(tp0); assertEquals(2, records.size()); - assertEquals(3L, subscriptions.position(tp0).longValue()); - assertEquals(1L, subscriptions.position(tp1).longValue()); + assertEquals(3L, subscriptions.position(tp0).offset); + assertEquals(1L, subscriptions.position(tp1).offset); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); @@ -2774,7 +2774,7 @@ public class FetcherTest { records = fetchedRecords.get(tp0); assertEquals(1, records.size()); assertEquals(3, records.get(0).offset()); - assertEquals(4L, subscriptions.position(tp0).longValue()); + assertEquals(4L, subscriptions.position(tp0).offset); // The second response contains no new records. LinkedHashMap> partitions2 = new LinkedHashMap<>(); @@ -2784,8 +2784,8 @@ public class FetcherTest { consumerClient.poll(time.timer(0)); fetchedRecords = fetchedRecords(); assertTrue(fetchedRecords.isEmpty()); - assertEquals(4L, subscriptions.position(tp0).longValue()); - assertEquals(1L, subscriptions.position(tp1).longValue()); + assertEquals(4L, subscriptions.position(tp0).offset); + assertEquals(1L, subscriptions.position(tp1).offset); // The third response contains some new records for tp0. LinkedHashMap> partitions3 = new LinkedHashMap<>(); @@ -2799,8 +2799,8 @@ public class FetcherTest { assertFalse(fetchedRecords.containsKey(tp1)); records = fetchedRecords.get(tp0); assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp0).longValue()); - assertEquals(1L, subscriptions.position(tp1).longValue()); + assertEquals(6L, subscriptions.position(tp0).offset); + assertEquals(1L, subscriptions.position(tp1).offset); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -3098,6 +3098,171 @@ public class FetcherTest { assertNull(offsetAndTimestampMap.get(tp0)); } + @Test + public void testSubscriptionPositionUpdatedWithEpoch() { + // Create some records that include a leader epoch (1) + MemoryRecordsBuilder builder = MemoryRecords.builder( + ByteBuffer.allocate(1024), + RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + 0L, + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, + 1 + ); + builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes()); + MemoryRecords records = builder.build(); + + buildFetcher(); + assignFromUser(singleton(tp0)); + + // Initialize the epoch=1 + Map partitionCounts = new HashMap<>(); + partitionCounts.put(tp0.topic(), 4); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1); + metadata.update(metadataResponse, 0L); + + // Seek + subscriptions.seek(tp0, 0); + + // Do a normal fetch + assertEquals(1, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tp0, records, Errors.NONE, 100L, 0)); + consumerClient.pollNoWakeup(); + assertTrue(fetcher.hasCompletedFetches()); + + Map>> partitionRecords = fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + + assertEquals(subscriptions.position(tp0).offset, 3L); + assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1)); + } + + @Test + public void testOffsetValidationFencing() { + buildFetcher(); + assignFromUser(singleton(tp0)); + + Map partitionCounts = new HashMap<>(); + partitionCounts.put(tp0.topic(), 4); + + final int epochOne = 1; + final int epochTwo = 2; + final int epochThree = 3; + + // Start with metadata, epoch=1 + metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L); + + // Seek with a position and leader+epoch + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne)); + subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + + // Update metadata to epoch=2, enter validation + metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> epochTwo), 0L); + fetcher.validateOffsetsIfNeeded(); + assertTrue(subscriptions.awaitingValidation(tp0)); + + // Update the position to epoch=3, as we would from a fetch + subscriptions.validate(tp0); + SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition( + 10, + Optional.of(epochTwo), + new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo))); + subscriptions.position(tp0, nextPosition); + subscriptions.maybeValidatePosition(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree))); + + // Prepare offset list response from async validation with epoch=2 + Map endOffsetMap = new HashMap<>(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, epochTwo, 10L)); + OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap); + client.prepareResponse(resp); + consumerClient.pollNoWakeup(); + assertTrue("Expected validation to fail since leader epoch changed", subscriptions.awaitingValidation(tp0)); + + // Next round of validation, should succeed in validating the position + fetcher.validateOffsetsIfNeeded(); + endOffsetMap.clear(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, epochThree, 10L)); + resp = new OffsetsForLeaderEpochResponse(endOffsetMap); + client.prepareResponse(resp); + consumerClient.pollNoWakeup(); + assertFalse("Expected validation to succeed with latest epoch", subscriptions.awaitingValidation(tp0)); + } + + @Test + public void testTruncationDetected() { + // Create some records that include a leader epoch (1) + MemoryRecordsBuilder builder = MemoryRecords.builder( + ByteBuffer.allocate(1024), + RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + 0L, + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, + 1 // record epoch is earlier than the leader epoch on the client + ); + builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes()); + MemoryRecords records = builder.build(); + + buildFetcher(); + assignFromUser(singleton(tp0)); + + // Initialize the epoch=2 + Map partitionCounts = new HashMap<>(); + partitionCounts.put(tp0.topic(), 4); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2); + metadata.update(metadataResponse, 0L); + + // Seek + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1)); + subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch)); + + // Check for truncation, this should cause tp0 to go into validation + fetcher.validateOffsetsIfNeeded(); + + // No fetches sent since we entered validation + assertEquals(0, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + assertTrue(subscriptions.awaitingValidation(tp0)); + + // Prepare OffsetForEpoch response then check that we update the subscription position correctly. + Map endOffsetMap = new HashMap<>(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, 1, 10L)); + OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap); + client.prepareResponse(resp); + consumerClient.pollNoWakeup(); + + assertFalse(subscriptions.awaitingValidation(tp0)); + + // Fetch again, now it works + assertEquals(1, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tp0, records, Errors.NONE, 100L, 0)); + consumerClient.pollNoWakeup(); + assertTrue(fetcher.hasCompletedFetches()); + + Map>> partitionRecords = fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + + assertEquals(subscriptions.position(tp0).offset, 3L); + assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1)); + } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp return new MockClient.RequestMatcher() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java new file mode 100644 index 00000000000..ee00e484206 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.EpochEndOffset; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class OffsetForLeaderEpochClientTest { + + private ConsumerNetworkClient consumerClient; + private SubscriptionState subscriptions; + private Metadata metadata; + private MockClient client; + private Time time; + + private TopicPartition tp0 = new TopicPartition("topic", 0); + + @Test + public void testEmptyResponse() { + OffsetsForLeaderEpochClient offsetClient = newOffsetClient(); + RequestFuture future = + offsetClient.sendAsyncRequest(Node.noNode(), Collections.emptyMap()); + + OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(Collections.emptyMap()); + client.prepareResponse(resp); + consumerClient.pollNoWakeup(); + + OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value(); + assertTrue(result.partitionsToRetry().isEmpty()); + assertTrue(result.endOffsets().isEmpty()); + } + + @Test + public void testUnexpectedEmptyResponse() { + Map positionMap = new HashMap<>(); + positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), + new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1)))); + + OffsetsForLeaderEpochClient offsetClient = newOffsetClient(); + RequestFuture future = + offsetClient.sendAsyncRequest(Node.noNode(), positionMap); + + OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(Collections.emptyMap()); + client.prepareResponse(resp); + consumerClient.pollNoWakeup(); + + OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value(); + assertFalse(result.partitionsToRetry().isEmpty()); + assertTrue(result.endOffsets().isEmpty()); + } + + @Test + public void testOkResponse() { + Map positionMap = new HashMap<>(); + positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), + new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1)))); + + OffsetsForLeaderEpochClient offsetClient = newOffsetClient(); + RequestFuture future = + offsetClient.sendAsyncRequest(Node.noNode(), positionMap); + + Map endOffsetMap = new HashMap<>(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.NONE, 1, 10L)); + client.prepareResponse(new OffsetsForLeaderEpochResponse(endOffsetMap)); + consumerClient.pollNoWakeup(); + + OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value(); + assertTrue(result.partitionsToRetry().isEmpty()); + assertTrue(result.endOffsets().containsKey(tp0)); + assertEquals(result.endOffsets().get(tp0).error(), Errors.NONE); + assertEquals(result.endOffsets().get(tp0).leaderEpoch(), 1); + assertEquals(result.endOffsets().get(tp0).endOffset(), 10L); + } + + @Test + public void testUnauthorizedTopic() { + Map positionMap = new HashMap<>(); + positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), + new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1)))); + + OffsetsForLeaderEpochClient offsetClient = newOffsetClient(); + RequestFuture future = + offsetClient.sendAsyncRequest(Node.noNode(), positionMap); + + Map endOffsetMap = new HashMap<>(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, -1, -1)); + client.prepareResponse(new OffsetsForLeaderEpochResponse(endOffsetMap)); + consumerClient.pollNoWakeup(); + + assertTrue(future.failed()); + assertEquals(future.exception().getClass(), TopicAuthorizationException.class); + assertTrue(((TopicAuthorizationException) future.exception()).unauthorizedTopics().contains(tp0.topic())); + } + + @Test + public void testRetriableError() { + Map positionMap = new HashMap<>(); + positionMap.put(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), + new Metadata.LeaderAndEpoch(Node.noNode(), Optional.of(1)))); + + OffsetsForLeaderEpochClient offsetClient = newOffsetClient(); + RequestFuture future = + offsetClient.sendAsyncRequest(Node.noNode(), positionMap); + + Map endOffsetMap = new HashMap<>(); + endOffsetMap.put(tp0, new EpochEndOffset(Errors.LEADER_NOT_AVAILABLE, -1, -1)); + client.prepareResponse(new OffsetsForLeaderEpochResponse(endOffsetMap)); + consumerClient.pollNoWakeup(); + + assertFalse(future.failed()); + OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value(); + assertTrue(result.partitionsToRetry().contains(tp0)); + assertFalse(result.endOffsets().containsKey(tp0)); + } + + private OffsetsForLeaderEpochClient newOffsetClient() { + buildDependencies(OffsetResetStrategy.EARLIEST); + return new OffsetsForLeaderEpochClient(consumerClient, new LogContext()); + } + + private void buildDependencies(OffsetResetStrategy offsetResetStrategy) { + LogContext logContext = new LogContext(); + time = new MockTime(1); + subscriptions = new SubscriptionState(logContext, offsetResetStrategy); + metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, + subscriptions, logContext, new ClusterResourceListeners()); + client = new MockClient(time, metadata); + consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, + 100, 1000, Integer.MAX_VALUE); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 5d4d1137fee..701866d1539 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; @@ -27,12 +29,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class SubscriptionStateTest { @@ -46,6 +50,7 @@ public class SubscriptionStateTest { private final TopicPartition tp1 = new TopicPartition(topic, 1); private final TopicPartition t1p0 = new TopicPartition(topic1, 0); private final MockRebalanceListener rebalanceListener = new MockRebalanceListener(); + private final Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Node.noNode(), Optional.empty()); @Test public void partitionAssignment() { @@ -55,7 +60,7 @@ public class SubscriptionStateTest { assertFalse(state.hasAllFetchPositions()); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); - assertEquals(1L, state.position(tp0).longValue()); + assertEquals(1L, state.position(tp0).offset); state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); @@ -167,11 +172,11 @@ public class SubscriptionStateTest { public void partitionReset() { state.assignFromUser(singleton(tp0)); state.seek(tp0, 5); - assertEquals(5L, (long) state.position(tp0)); + assertEquals(5L, state.position(tp0).offset); state.requestOffsetReset(tp0); assertFalse(state.isFetchable(tp0)); assertTrue(state.isOffsetResetNeeded(tp0)); - assertEquals(null, state.position(tp0)); + assertNotNull(state.position(tp0)); // seek should clear the reset and make the partition fetchable state.seek(tp0, 0); @@ -188,7 +193,7 @@ public class SubscriptionStateTest { assertTrue(state.partitionsAutoAssigned()); assertTrue(state.assignFromSubscribed(singleton(tp0))); state.seek(tp0, 1); - assertEquals(1L, state.position(tp0).longValue()); + assertEquals(1L, state.position(tp0).offset); assertTrue(state.assignFromSubscribed(singleton(tp1))); assertTrue(state.isAssigned(tp1)); assertFalse(state.isAssigned(tp0)); @@ -212,7 +217,7 @@ public class SubscriptionStateTest { public void invalidPositionUpdate() { state.subscribe(singleton(topic), rebalanceListener); assertTrue(state.assignFromSubscribed(singleton(tp0))); - state.position(tp0, 0); + state.position(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), leaderAndEpoch)); } @Test @@ -230,7 +235,7 @@ public class SubscriptionStateTest { @Test(expected = IllegalStateException.class) public void cantChangePositionForNonAssignedPartition() { - state.position(tp0, 1); + state.position(tp0, new SubscriptionState.FetchPosition(1, Optional.empty(), leaderAndEpoch)); } @Test(expected = IllegalStateException.class) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index f7a37baf4f3..58d900dba57 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; @@ -52,6 +53,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -113,20 +116,29 @@ public class TestUtils { public static MetadataResponse metadataUpdateWith(final String clusterId, final int numNodes, final Map topicPartitionCounts) { - return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts, MetadataResponse.PartitionMetadata::new); + return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new); } public static MetadataResponse metadataUpdateWith(final String clusterId, final int numNodes, final Map topicErrors, final Map topicPartitionCounts) { - return metadataUpdateWith(clusterId, numNodes, topicErrors, topicPartitionCounts, MetadataResponse.PartitionMetadata::new); + return metadataUpdateWith(clusterId, numNodes, topicErrors, topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new); } public static MetadataResponse metadataUpdateWith(final String clusterId, final int numNodes, final Map topicErrors, final Map topicPartitionCounts, + final Function epochSupplier) { + return metadataUpdateWith(clusterId, numNodes, topicErrors, topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new); + } + + public static MetadataResponse metadataUpdateWith(final String clusterId, + final int numNodes, + final Map topicErrors, + final Map topicPartitionCounts, + final Function epochSupplier, final PartitionMetadataSupplier partitionSupplier) { final List nodes = new ArrayList<>(numNodes); for (int i = 0; i < numNodes; i++) @@ -139,10 +151,11 @@ public class TestUtils { List partitionMetadata = new ArrayList<>(numPartitions); for (int i = 0; i < numPartitions; i++) { + TopicPartition tp = new TopicPartition(topic, i); Node leader = nodes.get(i % nodes.size()); List replicas = Collections.singletonList(leader); partitionMetadata.add(partitionSupplier.supply( - Errors.NONE, i, leader, Optional.empty(), replicas, replicas, replicas)); + Errors.NONE, i, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas)); } topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, @@ -443,4 +456,12 @@ public class TestUtils { public static ApiKeys apiKeyFrom(NetworkReceive networkReceive) { return RequestHeader.parse(networkReceive.payload().duplicate()).apiKey(); } + + public static void assertOptional(Optional optional, Consumer assertion) { + if (optional.isPresent()) { + assertion.accept(optional.get()); + } else { + fail("Missing value from Optional"); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 7b896ecf847..6426168a647 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -21,9 +21,11 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.tests.SmokeTestClient; import org.apache.kafka.streams.tests.SmokeTestDriver; +import org.apache.kafka.test.IntegrationTest; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; +import org.junit.experimental.categories.Category; import java.time.Duration; import java.util.ArrayList; @@ -34,6 +36,7 @@ import java.util.Set; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; +@Category(IntegrationTest.class) public class SmokeTestDriverIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index dfbec9f83da..d85c9dcf447 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -62,7 +62,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, isolation_level="read_uncommitted", jaas_override_variables=None, - kafka_opts_override="", client_prop_file_override=""): + kafka_opts_override="", client_prop_file_override="", consumer_properties={}): """ Args: context: standard context @@ -120,6 +120,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) self.jaas_override_variables = jaas_override_variables or {} self.kafka_opts_override = kafka_opts_override self.client_prop_file_override = client_prop_file_override + self.consumer_properties = consumer_properties def prop_file(self, node): @@ -205,6 +206,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) assert node.version >= V_0_10_0_0 cmd += " --enable-systest-events" + if self.consumer_properties is not None: + for k, v in self.consumer_properties.items(): + cmd += "--consumer_properties %s=%s" % (k, v) + cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 72d84a0ee73..6c0920e13a2 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -405,6 +405,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) + def set_unclean_leader_election(self, topic, value=True, node=None): + if node is None: + node = self.nodes[0] + if value is True: + self.logger.info("Enabling unclean leader election for topic %s", topic) + else: + self.logger.info("Disabling unclean leader election for topic %s", topic) + cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \ + (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic, str(value).lower()) + self.logger.info("Running alter unclean leader command...\n%s" % cmd) + node.account.ssh(cmd) + def parse_describe_topic(self, topic_description): """Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form PartitionCount:2\tReplicationFactor:2\tConfigs: diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 6e5d50e2f4b..b9aa6a7af02 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -15,7 +15,6 @@ import json import os -import signal from ducktape.services.background_thread import BackgroundThreadService @@ -34,7 +33,7 @@ class ConsumerState: class ConsumerEventHandler(object): - def __init__(self, node): + def __init__(self, node, verify_offsets): self.node = node self.state = ConsumerState.Dead self.revoked_count = 0 @@ -43,6 +42,7 @@ class ConsumerEventHandler(object): self.position = {} self.committed = {} self.total_consumed = 0 + self.verify_offsets = verify_offsets def handle_shutdown_complete(self): self.state = ConsumerState.Dead @@ -72,7 +72,7 @@ class ConsumerEventHandler(object): (offset, self.position[tp], str(tp)) self.committed[tp] = offset - def handle_records_consumed(self, event): + def handle_records_consumed(self, event, logger): assert self.state == ConsumerState.Joined, \ "Consumed records should only be received when joined (current state: %s)" % str(self.state) @@ -85,12 +85,18 @@ class ConsumerEventHandler(object): assert tp in self.assignment, \ "Consumed records for partition %s which is not assigned (current assignment: %s)" % \ (str(tp), str(self.assignment)) - assert tp not in self.position or self.position[tp] == min_offset, \ - "Consumed from an unexpected offset (%d, %d) for partition %s" % \ - (self.position[tp], min_offset, str(tp)) - self.position[tp] = max_offset + 1 - - self.total_consumed += event["count"] + if tp not in self.position or self.position[tp] == min_offset: + self.position[tp] = max_offset + 1 + else: + msg = "Consumed from an unexpected offset (%d, %d) for partition %s" % \ + (self.position.get(tp), min_offset, str(tp)) + if self.verify_offsets: + raise AssertionError(msg) + else: + if tp in self.position: + self.position[tp] = max_offset + 1 + logger.warn(msg) + self.total_consumed += event["count"] def handle_partitions_revoked(self, event): self.revoked_count += 1 @@ -162,7 +168,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou max_messages=-1, session_timeout_sec=30, enable_autocommit=False, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None, - on_record_consumed=None): + on_record_consumed=None, reset_policy="latest", verify_offsets=True): """ :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file """ @@ -172,6 +178,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.kafka = kafka self.topic = topic self.group_id = group_id + self.reset_policy = reset_policy self.max_messages = max_messages self.session_timeout_sec = session_timeout_sec self.enable_autocommit = enable_autocommit @@ -179,6 +186,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.prop_file = "" self.stop_timeout_sec = stop_timeout_sec self.on_record_consumed = on_record_consumed + self.verify_offsets = verify_offsets self.event_handlers = {} self.global_position = {} @@ -194,7 +202,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def _worker(self, idx, node): with self.lock: if node not in self.event_handlers: - self.event_handlers[node] = ConsumerEventHandler(node) + self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets) handler = self.event_handlers[node] node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False) @@ -228,7 +236,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou handler.handle_offsets_committed(event, node, self.logger) self._update_global_committed(event) elif name == "records_consumed": - handler.handle_records_consumed(event) + handler.handle_records_consumed(event, self.logger) self._update_global_position(event, node) elif name == "record_data" and self.on_record_consumed: self.on_record_consumed(event, node) @@ -244,9 +252,13 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"]) if tp in self.global_committed: # verify that the position never gets behind the current commit. - assert self.global_committed[tp] <= consumed_partition["minOffset"], \ - "Consumed position %d is behind the current committed offset %d for partition %s" % \ - (consumed_partition["minOffset"], self.global_committed[tp], str(tp)) + if self.global_committed[tp] > consumed_partition["minOffset"]: + msg = "Consumed position %d is behind the current committed offset %d for partition %s" % \ + (consumed_partition["minOffset"], self.global_committed[tp], str(tp)) + if self.verify_offsets: + raise AssertionError(msg) + else: + self.logger.warn(msg) # the consumer cannot generally guarantee that the position increases monotonically # without gaps in the face of hard failures, so we only log a warning when this happens @@ -274,8 +286,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou cmd += self.impl.exec_cmd(node) if self.on_record_consumed: cmd += " --verbose" - cmd += " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \ - (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), + cmd += " --reset-policy %s --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \ + (self.reset_policy, self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "") if self.max_messages > 0: diff --git a/tests/kafkatest/tests/client/truncation_test.py b/tests/kafkatest/tests/client/truncation_test.py new file mode 100644 index 00000000000..8269de7f22e --- /dev/null +++ b/tests/kafkatest/tests/client/truncation_test.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_consumer import VerifiableConsumer + + + +class TruncationTest(VerifiableConsumerTest): + TOPIC = "test_topic" + NUM_PARTITIONS = 1 + TOPICS = { + TOPIC: { + 'partitions': NUM_PARTITIONS, + 'replication-factor': 2 + } + } + GROUP_ID = "truncation-test" + + def __init__(self, test_context): + super(TruncationTest, self).__init__(test_context, num_consumers=1, num_producers=1, + num_zk=1, num_brokers=3, topics=self.TOPICS) + self.last_total = 0 + self.all_offsets_consumed = [] + self.all_values_consumed = [] + + def setup_consumer(self, topic, **kwargs): + consumer = super(TruncationTest, self).setup_consumer(topic, **kwargs) + self.mark_for_collect(consumer, 'verifiable_consumer_stdout') + + def print_record(event, node): + self.all_offsets_consumed.append(event['offset']) + self.all_values_consumed.append(event['value']) + consumer.on_record_consumed = print_record + + return consumer + + @cluster(num_nodes=7) + def test_offset_truncate(self): + """ + Verify correct consumer behavior when the brokers are consecutively restarted. + + Setup: single Kafka cluster with one producer writing messages to a single topic with one + partition, an a set of consumers in the same group reading from the same topic. + + - Start a producer which continues producing new messages throughout the test. + - Start up the consumers and wait until they've joined the group. + - In a loop, restart each broker consecutively, waiting for the group to stabilize between + each broker restart. + - Verify delivery semantics according to the failure type and that the broker bounces + did not cause unexpected group rebalances. + """ + tp = TopicPartition(self.TOPIC, 0) + + producer = self.setup_producer(self.TOPIC, throughput=10) + producer.start() + self.await_produced_messages(producer, min_messages=10) + + consumer = self.setup_consumer(self.TOPIC, reset_policy="earliest", verify_offsets=False) + consumer.start() + self.await_all_members(consumer) + + # Reduce ISR to one node + isr = self.kafka.isr_idx_list(self.TOPIC, 0) + node1 = self.kafka.get_node(isr[0]) + self.kafka.stop_node(node1) + self.logger.info("Reduced ISR to one node, consumer is at %s", consumer.current_position(tp)) + + # Ensure remaining ISR member has a little bit of data + current_total = consumer.total_consumed() + wait_until(lambda: consumer.total_consumed() > current_total + 10, + timeout_sec=30, + err_msg="Timed out waiting for consumer to move ahead by 10 messages") + + # Kill last ISR member + node2 = self.kafka.get_node(isr[1]) + self.kafka.stop_node(node2) + self.logger.info("No members in ISR, consumer is at %s", consumer.current_position(tp)) + + # Keep consuming until we've caught up to HW + def none_consumed(this, consumer): + new_total = consumer.total_consumed() + if new_total == this.last_total: + return True + else: + this.last_total = new_total + return False + + self.last_total = consumer.total_consumed() + wait_until(lambda: none_consumed(self, consumer), + timeout_sec=30, + err_msg="Timed out waiting for the consumer to catch up") + + self.kafka.start_node(node1) + self.logger.info("Out of sync replica is online, but not electable. Consumer is at %s", consumer.current_position(tp)) + + pre_truncation_pos = consumer.current_position(tp) + + self.kafka.set_unclean_leader_election(self.TOPIC) + self.logger.info("New unclean leader, consumer is at %s", consumer.current_position(tp)) + + # Wait for truncation to be detected + self.kafka.start_node(node2) + wait_until(lambda: consumer.current_position(tp) >= pre_truncation_pos, + timeout_sec=30, + err_msg="Timed out waiting for truncation") + + # Make sure we didn't reset to beginning of log + total_records_consumed = len(self.all_values_consumed) + assert total_records_consumed == len(set(self.all_values_consumed)), "Received duplicate records" + + consumer.stop() + producer.stop() + + # Re-consume all the records + consumer2 = VerifiableConsumer(self.test_context, 1, self.kafka, self.TOPIC, group_id="group2", + reset_policy="earliest", verify_offsets=True) + + consumer2.start() + self.await_all_members(consumer2) + + wait_until(lambda: consumer2.total_consumed() > 0, + timeout_sec=30, + err_msg="Timed out waiting for consumer to consume at least 10 messages") + + self.last_total = consumer2.total_consumed() + wait_until(lambda: none_consumed(self, consumer2), + timeout_sec=30, + err_msg="Timed out waiting for the consumer to fully consume data") + + second_total_consumed = consumer2.total_consumed() + assert second_total_consumed < total_records_consumed, "Expected fewer records with new consumer since we truncated" + self.logger.info("Second consumer saw only %s, meaning %s were truncated", + second_total_consumed, total_records_consumed - second_total_consumed) \ No newline at end of file diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 2ba2a619994..539a0f32f02 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -16,8 +16,6 @@ from ducktape.utils.util import wait_until from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.kafka import TopicPartition @@ -55,15 +53,16 @@ class VerifiableConsumerTest(KafkaTest): """Override this since we're adding services outside of the constructor""" return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers - def setup_consumer(self, topic, enable_autocommit=False, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor"): + def setup_consumer(self, topic, enable_autocommit=False, + assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", **kwargs): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, topic, self.group_id, session_timeout_sec=self.session_timeout_sec, assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit, - log_level="TRACE") + log_level="TRACE", **kwargs) - def setup_producer(self, topic, max_messages=-1): + def setup_producer(self, topic, max_messages=-1, throughput=500): return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic, - max_messages=max_messages, throughput=500, + max_messages=max_messages, throughput=throughput, request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC, log_level="DEBUG")