mirror of https://github.com/apache/kafka.git
KAFKA-18014: Add duration based offset reset option for ShareConsumer (#18096)
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka. As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset. Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
parent
ca7b1dcb00
commit
220c578521
|
@ -161,6 +161,20 @@ public class ShareFetchUtils {
|
|||
return timestampAndOffset.get().offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* The method is used to get the offset for the given timestamp for the topic-partition.
|
||||
*
|
||||
* @return The offset for the given timestamp.
|
||||
*/
|
||||
static long offsetForTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
|
||||
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
|
||||
topicIdPartition.topicPartition(), timestampToSearch, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
|
||||
if (timestampAndOffset.isEmpty()) {
|
||||
throw new OffsetNotAvailableException("Offset for timestamp " + timestampToSearch + " not found for topic partition: " + topicIdPartition);
|
||||
}
|
||||
return timestampAndOffset.get().offset;
|
||||
}
|
||||
|
||||
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
|
||||
return partition(replicaManager, tp).getLeaderEpoch();
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.kafka.common.record.RecordBatch;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.coordinator.group.GroupConfig;
|
||||
import org.apache.kafka.coordinator.group.GroupConfigManager;
|
||||
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
|
||||
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
|
||||
|
@ -76,6 +77,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
|
||||
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
|
||||
import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
|
||||
|
||||
/**
|
||||
* The SharePartition is used to track the state of a partition that is shared between multiple
|
||||
|
@ -2093,16 +2095,21 @@ public class SharePartition {
|
|||
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
|
||||
return partitionDataStartOffset;
|
||||
}
|
||||
GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
|
||||
ShareGroupAutoOffsetResetStrategy offsetResetStrategy;
|
||||
if (groupConfigManager.groupConfig(groupId).isPresent()) {
|
||||
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
|
||||
} else {
|
||||
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
|
||||
}
|
||||
|
||||
if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
|
||||
return offsetForEarliestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
|
||||
if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
|
||||
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
|
||||
} else if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
|
||||
return offsetForEarliestTimestamp(topicIdPartition, replicaManager, leaderEpoch);
|
||||
} else {
|
||||
// offsetResetStrategy type is BY_DURATION
|
||||
return offsetForTimestamp(topicIdPartition, replicaManager, offsetResetStrategy.timestamp(), leaderEpoch);
|
||||
}
|
||||
}
|
||||
|
||||
// Visible for testing. Should only be used for testing purposes.
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.kafka.common.utils.MockTime;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.coordinator.group.GroupConfig;
|
||||
import org.apache.kafka.coordinator.group.GroupConfigManager;
|
||||
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
|
||||
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
|
||||
|
@ -215,7 +216,7 @@ public class SharePartitionTest {
|
|||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
|
@ -265,7 +266,7 @@ public class SharePartitionTest {
|
|||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
|
@ -298,6 +299,64 @@ public class SharePartitionTest {
|
|||
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsByDuration() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
|
||||
// Since the timestamp() of duration based strategy is not deterministic, we need to mock the ShareGroupAutoOffsetResetStrategy.
|
||||
// mock: final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
|
||||
final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
|
||||
final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1);
|
||||
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
|
||||
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
|
||||
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1), 15L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertFalse(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with the (current time - 1 hour)
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(expectedTimestamp),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
|
||||
assertEquals(15, sharePartition.startOffset());
|
||||
assertEquals(15, sharePartition.endOffset());
|
||||
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
|
@ -407,7 +466,7 @@ public class SharePartitionTest {
|
|||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
|
@ -436,6 +495,59 @@ public class SharePartitionTest {
|
|||
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeFetchOffsetForByDurationThrowsError() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
|
||||
// We need to mock the ShareGroupAutoOffsetResetStrategy as the timestamp() of duration based strategy is not deterministic.
|
||||
// final ShareGroupAutoOffsetResetStrategy resetStrategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
|
||||
final ShareGroupAutoOffsetResetStrategy resetStrategy = Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
|
||||
final long expectedTimestamp = MOCK_TIME.milliseconds() - TimeUnit.HOURS.toMillis(1);
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
|
||||
|
||||
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
|
||||
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
|
||||
.thenThrow(new RuntimeException("fetch offsets exception"));
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertTrue(result.isCompletedExceptionally());
|
||||
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(expectedTimestamp),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeSharePartitionAgain() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
|
|
|
@ -585,7 +585,7 @@ class KafkaApisTest extends Logging {
|
|||
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.defaultShareAutoOffsetReset.toString)
|
||||
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
|
||||
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
|
||||
|
||||
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
|
||||
|
|
|
@ -21,10 +21,8 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -34,7 +32,6 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
|
|||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
|
||||
/**
|
||||
* Group configuration related parameters and supporting methods like validation, etc. are
|
||||
|
@ -53,8 +50,14 @@ public final class GroupConfig extends AbstractConfig {
|
|||
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms";
|
||||
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset";
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString();
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset.";
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetResetStrategy.LATEST.name();
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " +
|
||||
"<ul><li>earliest: automatically reset the offset to the earliest offset</li>" +
|
||||
"<li>latest: automatically reset the offset to the latest offset</li>" +
|
||||
"<li>by_duration:<duration>: automatically reset the offset to a configured duration from the current timestamp. " +
|
||||
"<duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
|
||||
"Negative duration is not allowed.</li>" +
|
||||
"<li>anything else: throw exception to the share consumer.</li></ul>";
|
||||
|
||||
public final int consumerSessionTimeoutMs;
|
||||
|
||||
|
@ -102,7 +105,7 @@ public final class GroupConfig extends AbstractConfig {
|
|||
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
|
||||
STRING,
|
||||
SHARE_AUTO_OFFSET_RESET_DEFAULT,
|
||||
in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
|
||||
new ShareGroupAutoOffsetResetStrategy.Validator(),
|
||||
MEDIUM,
|
||||
SHARE_AUTO_OFFSET_RESET_DOC);
|
||||
|
||||
|
@ -223,8 +226,8 @@ public final class GroupConfig extends AbstractConfig {
|
|||
/**
|
||||
* The default share group auto offset reset strategy.
|
||||
*/
|
||||
public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
|
||||
public static ShareGroupAutoOffsetResetStrategy defaultShareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetResetStrategy.fromString(SHARE_AUTO_OFFSET_RESET_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -265,16 +268,7 @@ public final class GroupConfig extends AbstractConfig {
|
|||
/**
|
||||
* The share group auto offset reset strategy.
|
||||
*/
|
||||
public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
public enum ShareGroupAutoOffsetReset {
|
||||
LATEST, EARLIEST;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents the strategy for resetting offsets in share consumer groups when no previous offset is found
|
||||
* for a partition or when an offset is out of range.
|
||||
*
|
||||
* Supports three strategies:
|
||||
* <ul>
|
||||
* <li>{@code EARLIEST} - Reset the offset to the earliest available offset
|
||||
* <li>{@code LATEST} - Reset the offset to the latest available offset
|
||||
* <li>{@code BY_DURATION} - Reset the offset to a timestamp that is the specified duration before the current time
|
||||
* </ul>
|
||||
*/
|
||||
public class ShareGroupAutoOffsetResetStrategy {
|
||||
|
||||
public static final ShareGroupAutoOffsetResetStrategy EARLIEST = new ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.EARLIEST, StrategyType.EARLIEST);
|
||||
public static final ShareGroupAutoOffsetResetStrategy LATEST = new ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.LATEST, StrategyType.LATEST);
|
||||
|
||||
public enum StrategyType {
|
||||
LATEST, EARLIEST, BY_DURATION;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
}
|
||||
|
||||
private final AutoOffsetResetStrategy delegate;
|
||||
private final StrategyType type;
|
||||
|
||||
private ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy delegate, StrategyType type) {
|
||||
this.delegate = delegate;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method to create a ShareGroupAutoOffsetResetStrategy from a string representation.
|
||||
*/
|
||||
public static ShareGroupAutoOffsetResetStrategy fromString(String offsetStrategy) {
|
||||
AutoOffsetResetStrategy baseStrategy = AutoOffsetResetStrategy.fromString(offsetStrategy);
|
||||
AutoOffsetResetStrategy.StrategyType baseType = baseStrategy.type();
|
||||
|
||||
StrategyType shareGroupType;
|
||||
switch (baseType) {
|
||||
case EARLIEST:
|
||||
shareGroupType = StrategyType.EARLIEST;
|
||||
break;
|
||||
case LATEST:
|
||||
shareGroupType = StrategyType.LATEST;
|
||||
break;
|
||||
case BY_DURATION:
|
||||
shareGroupType = StrategyType.BY_DURATION;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported strategy for ShareGroup: " + baseType);
|
||||
}
|
||||
|
||||
return new ShareGroupAutoOffsetResetStrategy(baseStrategy, shareGroupType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the share group strategy type.
|
||||
*/
|
||||
public StrategyType type() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the share group offset reset strategy.
|
||||
*/
|
||||
public String name() {
|
||||
return type.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates the timestamp calculation to the base strategy.
|
||||
* @return the timestamp for the OffsetResetStrategy,
|
||||
* if the strategy is EARLIEST or LATEST or duration is provided
|
||||
* else return Optional.empty()
|
||||
*/
|
||||
public Long timestamp() {
|
||||
return delegate.timestamp().get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ShareGroupAutoOffsetResetStrategy that = (ShareGroupAutoOffsetResetStrategy) o;
|
||||
return type == that.type && Objects.equals(delegate, that.delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(delegate, type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ShareGroupAutoOffsetResetStrategy{" +
|
||||
"type=" + type +
|
||||
", delegate=" + delegate +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for creating EARLIEST strategy.
|
||||
*/
|
||||
public static ShareGroupAutoOffsetResetStrategy earliest() {
|
||||
return new ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.EARLIEST, StrategyType.EARLIEST);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for creating LATEST strategy.
|
||||
*/
|
||||
public static ShareGroupAutoOffsetResetStrategy latest() {
|
||||
return new ShareGroupAutoOffsetResetStrategy(AutoOffsetResetStrategy.LATEST, StrategyType.LATEST);
|
||||
}
|
||||
|
||||
public static class Validator implements ConfigDef.Validator {
|
||||
@Override
|
||||
public void ensureValid(String name, Object value) {
|
||||
String offsetStrategy = (String) value;
|
||||
try {
|
||||
fromString(offsetStrategy);
|
||||
} catch (Exception e) {
|
||||
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
|
||||
name + ". The value must be either 'earliest', 'latest' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[earliest, latest, by_duration:PnDTnHnMn.nS]";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -88,6 +88,10 @@ public class GroupConfigTest {
|
|||
// Check for value "earliest"
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
doTestValidProps(props);
|
||||
|
||||
// Check for value "by_duration"
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:PT10S");
|
||||
doTestValidProps(props);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -148,6 +152,18 @@ public class GroupConfigTest {
|
|||
// Check for invalid shareAutoOffsetReset
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
|
||||
// Check for invalid shareAutoOffsetReset, by_duration without duration
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
|
||||
// Check for invalid shareAutoOffsetReset, by_duration with negative duration
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT10S");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
|
||||
// Check for invalid shareAutoOffsetReset, by_duration with invalid duration
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:invalid");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
}
|
||||
|
||||
private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) {
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ShareGroupAutoOffsetResetStrategyTest {
|
||||
|
||||
@Test
|
||||
public void testFromString() {
|
||||
assertEquals(ShareGroupAutoOffsetResetStrategy.EARLIEST, ShareGroupAutoOffsetResetStrategy.fromString("earliest"));
|
||||
assertEquals(ShareGroupAutoOffsetResetStrategy.LATEST, ShareGroupAutoOffsetResetStrategy.fromString("latest"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("invalid"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("by_duration:invalid"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("by_duration:-PT1H"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("by_duration:"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("by_duration"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("LATEST"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("EARLIEST"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString("NONE"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString(""));
|
||||
assertThrows(IllegalArgumentException.class, () -> ShareGroupAutoOffsetResetStrategy.fromString(null));
|
||||
|
||||
ShareGroupAutoOffsetResetStrategy strategy = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
|
||||
assertEquals("by_duration", strategy.name());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidator() {
|
||||
ShareGroupAutoOffsetResetStrategy.Validator validator = new ShareGroupAutoOffsetResetStrategy.Validator();
|
||||
assertDoesNotThrow(() -> validator.ensureValid("test", "earliest"));
|
||||
assertDoesNotThrow(() -> validator.ensureValid("test", "latest"));
|
||||
assertDoesNotThrow(() -> validator.ensureValid("test", "by_duration:PT1H"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:invalid"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:-PT1H"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "EARLIEST"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", "NONE"));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", ""));
|
||||
assertThrows(ConfigException.class, () -> validator.ensureValid("test", null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualsAndHashCode() {
|
||||
ShareGroupAutoOffsetResetStrategy earliest1 = ShareGroupAutoOffsetResetStrategy.fromString("earliest");
|
||||
ShareGroupAutoOffsetResetStrategy earliest2 = ShareGroupAutoOffsetResetStrategy.fromString("earliest");
|
||||
ShareGroupAutoOffsetResetStrategy latest1 = ShareGroupAutoOffsetResetStrategy.fromString("latest");
|
||||
|
||||
ShareGroupAutoOffsetResetStrategy duration1 = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:P2D");
|
||||
ShareGroupAutoOffsetResetStrategy duration2 = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:P2D");
|
||||
|
||||
assertEquals(earliest1, earliest2);
|
||||
assertNotEquals(earliest1, latest1);
|
||||
assertEquals(earliest1.hashCode(), earliest2.hashCode());
|
||||
assertNotEquals(earliest1.hashCode(), latest1.hashCode());
|
||||
|
||||
assertNotEquals(latest1, duration2);
|
||||
assertEquals(duration1, duration2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimestamp() {
|
||||
ShareGroupAutoOffsetResetStrategy earliest1 = ShareGroupAutoOffsetResetStrategy.fromString("earliest");
|
||||
ShareGroupAutoOffsetResetStrategy earliest2 = ShareGroupAutoOffsetResetStrategy.fromString("earliest");
|
||||
assertEquals(ListOffsetsRequest.EARLIEST_TIMESTAMP, earliest1.timestamp());
|
||||
assertEquals(earliest1, earliest2);
|
||||
|
||||
ShareGroupAutoOffsetResetStrategy latest1 = ShareGroupAutoOffsetResetStrategy.fromString("latest");
|
||||
ShareGroupAutoOffsetResetStrategy latest2 = ShareGroupAutoOffsetResetStrategy.fromString("latest");
|
||||
assertEquals(ListOffsetsRequest.LATEST_TIMESTAMP, latest1.timestamp());
|
||||
assertEquals(latest1, latest2);
|
||||
|
||||
ShareGroupAutoOffsetResetStrategy byDuration1 = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
|
||||
Long timestamp = byDuration1.timestamp();
|
||||
assertTrue(timestamp <= Instant.now().toEpochMilli() - Duration.ofHours(1).toMillis());
|
||||
|
||||
ShareGroupAutoOffsetResetStrategy byDuration2 = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
|
||||
ShareGroupAutoOffsetResetStrategy byDuration3 = ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT2H");
|
||||
|
||||
assertEquals(byDuration1, byDuration2);
|
||||
assertNotEquals(byDuration1, byDuration3);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue