KAFKA-18013: Add support for duration based offset reset strategy to Kafka Consumer (#17972)

Update AutoOffsetResetStrategy.java to support duration based reset strategy
Update OffsetFetcher related classes and unit tests

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
Manikumar Reddy 2024-11-29 22:38:57 +05:30 committed by GitHub
parent 6237325fb1
commit ae3c5dec99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 251 additions and 82 deletions

View File

@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
"(e.g. because that data has been deleted): " +
"<ul><li>earliest: automatically reset the offset to the earliest offset" +
"<li>latest: automatically reset the offset to the latest offset</li>" +
"<li>by_duration:<duration>: automatically reset the offset to a configured <duration> from the current timestamp. <duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
"Negative duration is not allowed.</li>" +
"<li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>" +
"<li>anything else: throw exception to the consumer.</li></ul>" +
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +

View File

@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, Long> durationResetOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
@ -104,6 +105,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.closed = false;
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.durationResetOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
@ -433,6 +435,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
endOffsets.putAll(newOffsets);
}
public synchronized void updateDurationOffsets(final Map<TopicPartition, Long> newOffsets) {
durationResetOffsets.putAll(newOffsets);
}
public void disableTelemetry() {
telemetryDisabled = true;
}
@ -610,6 +616,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
offset = durationResetOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp");
} else {
throw new NoOffsetForPartitionException(tp);
}

View File

@ -18,15 +18,19 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Utils;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
public class AutoOffsetResetStrategy {
private enum StrategyType {
LATEST, EARLIEST, NONE;
public enum StrategyType {
LATEST, EARLIEST, NONE, BY_DURATION;
@Override
public String toString() {
@ -39,30 +43,65 @@ public class AutoOffsetResetStrategy {
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
this.type = type;
this.duration = Optional.empty();
}
public static boolean isValid(String offsetStrategy) {
return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
private AutoOffsetResetStrategy(Duration duration) {
this.type = StrategyType.BY_DURATION;
this.duration = Optional.of(duration);
}
/**
* Returns the AutoOffsetResetStrategy from the given string.
*/
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
if (offsetStrategy == null || !isValid(offsetStrategy)) {
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
if (offsetStrategy == null) {
throw new IllegalArgumentException("Auto offset reset strategy is null");
}
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy.");
}
if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) {
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
}
if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
try {
Duration duration = Duration.parse(isoDuration);
if (duration.isNegative()) {
throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy.");
}
return new AutoOffsetResetStrategy(duration);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e);
}
}
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
/**
* Returns the offset reset strategy type.
*/
public StrategyType type() {
return type;
}
/**
@ -72,33 +111,54 @@ public class AutoOffsetResetStrategy {
return type.toString();
}
/**
* Return the timestamp to be used for the ListOffsetsRequest.
* @return the timestamp for the OffsetResetStrategy,
* if the strategy is EARLIEST or LATEST or duration is provided
* else return Optional.empty()
*/
public Optional<Long> timestamp() {
if (type == StrategyType.EARLIEST)
return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
else if (type == StrategyType.LATEST)
return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
Instant now = Instant.now();
return Optional.of(now.minus(duration.get()).toEpochMilli());
} else
return Optional.empty();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
return Objects.equals(type, that.type);
return type == that.type && Objects.equals(duration, that.duration);
}
@Override
public int hashCode() {
return Objects.hashCode(type);
return Objects.hash(type, duration);
}
@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
"type='" + type + '\'' +
"type=" + type +
(duration.map(value -> ", duration=" + value).orElse("")) +
'}';
}
public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
String strategy = (String) value;
if (!AutoOffsetResetStrategy.isValid(strategy)) {
throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " +
name + ": the value must be either 'earliest', 'latest', or 'none'.");
String offsetStrategy = (String) value;
try {
fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
}
}
}

View File

@ -101,12 +101,13 @@ public class OffsetFetcher {
* and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
*/
public void resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap =
offsetFetcherUtils.getOffsetResetStrategyForPartitions();
if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return;
resetPositionsAsync(offsetResetTimestamps);
resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
}
/**
@ -209,7 +210,9 @@ public class OffsetFetcher {
}
}
private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
@ -221,7 +224,7 @@ public class OffsetFetcher {
future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult result) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap);
}
@Override

View File

@ -32,7 +32,6 @@ import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.LogContext;
@ -224,19 +223,22 @@ class OffsetFetcherUtils {
}
}
Map<TopicPartition, Long> getOffsetResetTimestamp() {
/**
* get OffsetResetStrategy for all assigned partitions
*/
Map<TopicPartition, AutoOffsetResetStrategy> getOffsetResetStrategyForPartitions() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;
Set<TopicPartition> partitions = subscriptionState.partitionsNeedingReset(time.milliseconds());
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap = new HashMap<>();
for (final TopicPartition partition : partitions) {
offsetResetTimestamps.put(partition, offsetResetStrategyTimestamp(partition));
partitionAutoOffsetResetStrategyMap.put(partition, offsetResetStrategyWithValidTimestamp(partition));
}
return offsetResetTimestamps;
return partitionAutoOffsetResetStrategyMap;
}
static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
@ -283,14 +285,13 @@ class OffsetFetcherUtils {
return offsetsResults;
}
private long offsetResetStrategyTimestamp(final TopicPartition partition) {
private AutoOffsetResetStrategy offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == AutoOffsetResetStrategy.EARLIEST)
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
else if (strategy == AutoOffsetResetStrategy.LATEST)
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
if (strategy.timestamp().isPresent()) {
return strategy;
} else {
throw new NoOffsetForPartitionException(partition);
}
}
static Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
@ -319,18 +320,9 @@ class OffsetFetcherUtils {
}
}
static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
return AutoOffsetResetStrategy.EARLIEST;
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
return AutoOffsetResetStrategy.LATEST;
else
return null;
}
void onSuccessfulResponseForResettingPositions(
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final ListOffsetResult result) {
final ListOffsetResult result,
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
if (!result.partitionsToRetry.isEmpty()) {
subscriptionState.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
@ -339,10 +331,9 @@ class OffsetFetcherUtils {
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
ListOffsetsRequestData.ListOffsetsPartition requestedReset = resetTimestamps.get(partition);
resetPositionIfNeeded(
partition,
timestampToOffsetResetStrategy(requestedReset.timestamp()),
partitionAutoOffsetResetStrategyMap.get(partition),
offsetData);
}
}

View File

@ -472,20 +472,20 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
CompletableFuture<Void> resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps;
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap;
try {
offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions();
} catch (Exception e) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(e);
return result;
}
if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return CompletableFuture.completedFuture(null);
return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
return sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
}
/**
@ -652,12 +652,14 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
* partitions. Use the retrieved offsets to reset positions in the subscription state.
* This also adds the request to the list of unsentRequests.
*
* @param timestampsToSearch the mapping between partitions and target time
* @param partitionAutoOffsetResetStrategyMap the mapping between partitions and AutoOffsetResetStrategy
* @return A {@link CompletableFuture} which completes when the requests are
* complete.
*/
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> timestampsToSearch = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.empty());
@ -677,8 +679,8 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
partialResult.whenComplete((result, error) -> {
if (error == null) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
partitionAutoOffsetResetStrategyMap);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {

View File

@ -441,7 +441,7 @@ public class SubscriptionState {
log.debug("Skipping reset of partition {} since it is no longer assigned", tp);
} else if (!state.awaitingReset()) {
log.debug("Skipping reset of partition {} since reset is no longer needed", tp);
} else if (requestedResetStrategy != state.resetStrategy) {
} else if (requestedResetStrategy != null && !requestedResetStrategy.equals(state.resetStrategy)) {
log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp);
} else {
log.info("Resetting offset for partition {} to position {}.", tp, position);

View File

@ -1082,7 +1082,20 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.LATEST);
setUpConsumerWithAutoResetPolicy(groupProtocol, AutoOffsetResetStrategy.LATEST);
assertEquals(50L, consumer.position(tp0));
}
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testResetUsingDurationBasedAutoResetPolicy(GroupProtocol groupProtocol) {
AutoOffsetResetStrategy durationStrategy = AutoOffsetResetStrategy.fromString("by_duration:PT1H");
setUpConsumerWithAutoResetPolicy(groupProtocol, durationStrategy);
assertEquals(50L, consumer.position(tp0));
}
private void setUpConsumerWithAutoResetPolicy(GroupProtocol groupProtocol, AutoOffsetResetStrategy strategy) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), strategy);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@ -1100,8 +1113,6 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L)));
consumer.poll(Duration.ZERO);
assertEquals(50L, consumer.position(tp0));
}
@ParameterizedTest

View File

@ -117,6 +117,28 @@ public class MockConsumerTest {
assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
}
@Test
public void testDurationBasedOffsetReset() {
MockConsumer<String, String> consumer = new MockConsumer<>("by_duration:PT1H");
consumer.subscribe(Collections.singleton("test"));
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
HashMap<TopicPartition, Long> durationBasedOffsets = new HashMap<>();
durationBasedOffsets.put(new TopicPartition("test", 0), 10L);
durationBasedOffsets.put(new TopicPartition("test", 1), 11L);
consumer.updateDurationOffsets(durationBasedOffsets);
ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 10L, 0L, TimestampType.CREATE_TIME,
0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 11L, 0L, TimestampType.CREATE_TIME,
0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
Iterator<ConsumerRecord<String, String>> iter = records.iterator();
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testRebalanceListener() {
final List<TopicPartition> revoked = new ArrayList<>();

View File

@ -17,9 +17,14 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -29,26 +34,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AutoOffsetResetStrategyTest {
@Test
public void testIsValid() {
assertTrue(AutoOffsetResetStrategy.isValid("earliest"));
assertTrue(AutoOffsetResetStrategy.isValid("latest"));
assertTrue(AutoOffsetResetStrategy.isValid("none"));
assertFalse(AutoOffsetResetStrategy.isValid("invalid"));
assertFalse(AutoOffsetResetStrategy.isValid("LATEST"));
assertFalse(AutoOffsetResetStrategy.isValid(""));
assertFalse(AutoOffsetResetStrategy.isValid(null));
}
@Test
public void testFromString() {
assertEquals(AutoOffsetResetStrategy.EARLIEST, AutoOffsetResetStrategy.fromString("earliest"));
assertEquals(AutoOffsetResetStrategy.LATEST, AutoOffsetResetStrategy.fromString("latest"));
assertEquals(AutoOffsetResetStrategy.NONE, AutoOffsetResetStrategy.fromString("none"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("invalid"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:invalid"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:-PT1H"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("LATEST"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("EARLIEST"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("NONE"));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString(""));
assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString(null));
AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.fromString("by_duration:PT1H");
assertEquals("by_duration", strategy.name());
}
@Test
@ -57,21 +60,63 @@ public class AutoOffsetResetStrategyTest {
assertDoesNotThrow(() -> validator.ensureValid("test", "earliest"));
assertDoesNotThrow(() -> validator.ensureValid("test", "latest"));
assertDoesNotThrow(() -> validator.ensureValid("test", "none"));
assertDoesNotThrow(() -> validator.ensureValid("test", "by_duration:PT1H"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:invalid"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:-PT1H"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "EARLIEST"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "NONE"));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", ""));
assertThrows(ConfigException.class, () -> validator.ensureValid("test", null));
}
@Test
public void testEqualsAndHashCode() {
AutoOffsetResetStrategy strategy1 = AutoOffsetResetStrategy.fromString("earliest");
AutoOffsetResetStrategy strategy2 = AutoOffsetResetStrategy.fromString("earliest");
AutoOffsetResetStrategy strategy3 = AutoOffsetResetStrategy.fromString("latest");
AutoOffsetResetStrategy earliest1 = AutoOffsetResetStrategy.fromString("earliest");
AutoOffsetResetStrategy earliest2 = AutoOffsetResetStrategy.fromString("earliest");
AutoOffsetResetStrategy latest1 = AutoOffsetResetStrategy.fromString("latest");
assertEquals(strategy1, strategy2);
assertNotEquals(strategy1, strategy3);
assertEquals(strategy1.hashCode(), strategy2.hashCode());
assertNotEquals(strategy1.hashCode(), strategy3.hashCode());
AutoOffsetResetStrategy duration1 = AutoOffsetResetStrategy.fromString("by_duration:P2D");
AutoOffsetResetStrategy duration2 = AutoOffsetResetStrategy.fromString("by_duration:P2D");
assertEquals(earliest1, earliest2);
assertNotEquals(earliest1, latest1);
assertEquals(earliest1.hashCode(), earliest2.hashCode());
assertNotEquals(earliest1.hashCode(), latest1.hashCode());
assertNotEquals(latest1, duration2);
assertEquals(duration1, duration2);
}
@Test
public void testTimestamp() {
AutoOffsetResetStrategy earliest1 = AutoOffsetResetStrategy.fromString("earliest");
AutoOffsetResetStrategy earliest2 = AutoOffsetResetStrategy.fromString("earliest");
assertEquals(Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP), earliest1.timestamp());
assertEquals(earliest1, earliest2);
AutoOffsetResetStrategy latest1 = AutoOffsetResetStrategy.fromString("latest");
AutoOffsetResetStrategy latest2 = AutoOffsetResetStrategy.fromString("latest");
assertEquals(Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP), latest1.timestamp());
assertEquals(latest1, latest2);
AutoOffsetResetStrategy none1 = AutoOffsetResetStrategy.fromString("none");
AutoOffsetResetStrategy none2 = AutoOffsetResetStrategy.fromString("none");
assertFalse(none1.timestamp().isPresent());
assertEquals(none1, none2);
AutoOffsetResetStrategy byDuration1 = AutoOffsetResetStrategy.fromString("by_duration:PT1H");
Optional<Long> timestamp = byDuration1.timestamp();
assertTrue(timestamp.isPresent());
assertTrue(timestamp.get() <= Instant.now().toEpochMilli() - Duration.ofHours(1).toMillis());
AutoOffsetResetStrategy byDuration2 = AutoOffsetResetStrategy.fromString("by_duration:PT1H");
AutoOffsetResetStrategy byDuration3 = AutoOffsetResetStrategy.fromString("by_duration:PT2H");
assertEquals(byDuration1, byDuration2);
assertNotEquals(byDuration1, byDuration3);
}
}

View File

@ -65,6 +65,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -93,6 +94,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class OffsetFetcherTest {
@ -186,6 +189,26 @@ public class OffsetFetcherTest {
assertEquals(5, subscriptions.position(tp0).offset);
}
@Test
public void testUpdateFetchPositionResetToDurationOffset() {
long timestamp = Instant.now().toEpochMilli();
AutoOffsetResetStrategy durationStrategy = mock(AutoOffsetResetStrategy.class);
when(durationStrategy.timestamp()).thenReturn(Optional.of(timestamp));
buildFetcher(durationStrategy);
assignFromUser(singleton(tp0));
subscriptions.requestOffsetReset(tp0, durationStrategy);
client.updateMetadata(initialUpdateResponse);
client.prepareResponse(listOffsetRequestMatcher(timestamp),
listOffsetResponse(Errors.NONE, 1L, 5L));
offsetFetcher.resetPositionsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertTrue(subscriptions.isFetchable(tp0));
assertEquals(5, subscriptions.position(tp0).offset);
}
/**
* Make sure the client behaves appropriately when receiving an exception for unavailable offsets
*/