KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups (#17573)

This PR adds another dynamic config share.auto.offset.reset fir share groups.


Reviewers:  Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>,  Abhinav Dixit <adixit@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Chirag Wadhwa 2024-11-11 14:36:11 +05:30 committed by GitHub
parent 7e5ffd3403
commit 9db5ed00a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 889 additions and 164 deletions

View File

@ -19,9 +19,11 @@ package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
@ -40,6 +42,7 @@ import java.util.Map;
import java.util.Optional;
import scala.Option;
import scala.Some;
/**
* Utility class for post-processing of share fetch operations.
@ -125,7 +128,26 @@ public class ShareFetchUtils {
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition);
}
return timestampAndOffset.get().offset;
}
/**
* The method is used to get the offset for the latest timestamp for the topic-partition.
*
* @return The offset for the latest timestamp.
*/
static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) {
// Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.empty(), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) {
throw new OffsetNotAvailableException("offset for Latest timestamp not found for topic partition: " + topicIdPartition);
}
return timestampAndOffset.get().offset;
}
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@ -72,6 +73,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
/**
* The SharePartition is used to track the state of a partition that is shared between multiple
* consumers. The class maintains the state of the records that have been fetched from the leader
@ -421,8 +425,12 @@ public class SharePartition {
return;
}
// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
try {
startOffset = startOffsetDuringInitialization(partitionData.startOffset());
} catch (Exception e) {
completeInitializationWithException(future, e);
return;
}
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
@ -448,7 +456,7 @@ public class SharePartition {
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset());
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
@ -2058,6 +2066,23 @@ public class SharePartition {
}
}
private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
// Set the state epoch and end offset from the persisted state.
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
}
GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
if (groupConfigManager.groupConfig(groupId).isPresent()) {
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
} else {
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
}
if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
return offsetForEarliestTimestamp(topicIdPartition, replicaManager);
return offsetForLatestTimestamp(topicIdPartition, replicaManager);
}
// Visible for testing. Should only be used for testing purposes.
NavigableMap<Long, InFlightBatch> cachedState() {
return new ConcurrentSkipListMap<>(cachedState);

View File

@ -17,6 +17,7 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.log.OffsetResultHolder;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest;
@ -1040,6 +1042,8 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
mockFetchOffsetForTimestamp(mockReplicaManager);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
Metrics metrics = new Metrics();
@ -1109,6 +1113,9 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
mockFetchOffsetForTimestamp(mockReplicaManager);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@ -1233,6 +1240,8 @@ public class SharePartitionManagerTest {
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
mockFetchOffsetForTimestamp(mockReplicaManager);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@ -2482,6 +2491,12 @@ public class SharePartitionManagerTest {
});
}
private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(

View File

@ -16,12 +16,14 @@
*/
package kafka.server.share;
import kafka.log.OffsetResultHolder;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@ -34,9 +36,11 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
@ -77,6 +81,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import scala.Option;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@ -191,6 +197,244 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(11L).offsetState());
}
@Test
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(
0, PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE,
PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP"
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
Mockito.any(TopicPartition.class),
Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
Mockito.any(),
Mockito.any(),
Mockito.anyBoolean()
);
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
assertEquals(0, sharePartition.startOffset());
assertEquals(0, sharePartition.endOffset());
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
}
@Test
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(
0, PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE,
PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
Mockito.any(TopicPartition.class),
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
Mockito.any(),
Mockito.any(),
Mockito.anyBoolean()
);
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
assertEquals(15, sharePartition.startOffset());
assertEquals(15, sharePartition.endOffset());
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
}
@Test
public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(
0, PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE,
PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
Mockito.any(TopicPartition.class),
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
Mockito.any(),
Mockito.any(),
Mockito.anyBoolean()
);
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
assertEquals(15, sharePartition.startOffset());
assertEquals(15, sharePartition.endOffset());
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
}
@Test
public void testMaybeInitializeFetchOffsetForLatestTimestampThrowsError() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(
0, PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE,
PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
.thenThrow(new RuntimeException("fetch offsets exception"));
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
Mockito.any(TopicPartition.class),
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
Mockito.any(),
Mockito.any(),
Mockito.anyBoolean()
);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
}
@Test
public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionAllData(
0, PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE,
PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
.thenThrow(new RuntimeException("fetch offsets exception"));
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withGroupConfigManager(groupConfigManager)
.withReplicaManager(replicaManager)
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP"
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
Mockito.any(TopicPartition.class),
Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
Mockito.any(),
Mockito.any(),
Mockito.anyBoolean()
);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
}
@Test
public void testMaybeInitializeSharePartitionAgain() {
Persister persister = Mockito.mock(Persister.class);
@ -460,7 +704,13 @@ public class SharePartitionTest {
@Test
public void testMaybeInitializeWithNoOpShareStatePersister() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
@ -825,7 +1075,13 @@ public class SharePartitionTest {
@Test
public void testMaybeAcquireAndReleaseFetchLock() {
SharePartition sharePartition = SharePartitionBuilder.builder().build();
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
sharePartition.maybeInitialize();
assertTrue(sharePartition.maybeAcquireFetchLock());
// Lock cannot be acquired again, as already acquired.
@ -5233,7 +5489,7 @@ public class SharePartitionTest {
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private Persister persister = new NoOpShareStatePersister();
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
private SharePartitionState state = SharePartitionState.EMPTY;
@ -5257,6 +5513,11 @@ public class SharePartitionTest {
return this;
}
private SharePartitionBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager;
return this;
}
private SharePartitionBuilder withGroupConfigManager(GroupConfigManager groupConfigManager) {
this.groupConfigManager = groupConfigManager;
return this;

View File

@ -19,6 +19,9 @@ package kafka.test.api;
import kafka.api.BaseConsumerTest;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
@ -34,6 +37,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
@ -47,6 +51,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@ -60,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -101,6 +107,8 @@ public class ShareConsumerTest {
private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister";
private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister";
private Admin adminClient;
@BeforeEach
public void createCluster(TestInfo testInfo) throws Exception {
String persisterClassName = NO_OP_PERSISTER;
@ -131,11 +139,13 @@ public class ShareConsumerTest {
cluster.waitForReadyBrokers();
createTopic("topic");
createTopic("topic2");
adminClient = createAdminClient();
warmup();
}
@AfterEach
public void destroyCluster() throws Exception {
adminClient.close();
cluster.close();
}
@ -156,6 +166,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.close();
assertEquals(0, records.count());
@ -168,6 +179,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.unsubscribe();
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@ -182,6 +194,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
assertEquals(0, records.count());
shareConsumer.subscribe(subscription);
@ -198,6 +211,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.unsubscribe();
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@ -214,6 +228,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.subscribe(Collections.emptySet());
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@ -231,6 +246,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
shareConsumer.close();
@ -245,6 +261,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
producer.send(record);
@ -273,6 +290,8 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@ -307,6 +326,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
// Now in the second poll, we implicitly acknowledge the record received in the first poll.
@ -334,6 +355,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@ -361,6 +384,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@ -420,6 +445,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());
@ -442,6 +468,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());
@ -468,6 +495,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
long i = 0L;
for (ConsumerRecord<byte[], byte[]> record : records) {
@ -513,6 +542,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(4, records.count());
assertEquals(transactional1.offset(), records.records(tp).get(0).offset());
@ -538,6 +569,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
@ -556,6 +588,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
@ -583,6 +616,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
@ -634,6 +668,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
@ -689,6 +724,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@ -709,6 +745,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@ -727,6 +764,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@ -734,7 +772,6 @@ public class ShareConsumerTest {
producer.close();
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
@ -743,6 +780,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
@ -762,6 +800,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
@ -780,6 +819,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync();
@ -805,6 +845,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
@ -843,6 +884,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1", Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes)));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
shareConsumer.close();
@ -857,9 +900,11 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group2", "earliest");
// producing 3 records to the topic
producer.send(record);
@ -907,6 +952,7 @@ public class ShareConsumerTest {
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
int totalMessages = 2000;
for (int i = 0; i < totalMessages; i++) {
@ -943,9 +989,16 @@ public class ShareConsumerTest {
int producerCount = 4;
int messagesPerProducer = 5000;
String groupId = "group1";
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount);
// This consumer is created to register the share group id with the groupCoordinator
// so that the config share.auto.offset.reset can be altered for this group
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
alterShareAutoOffsetReset(groupId, "earliest");
for (int i = 0; i < producerCount; i++) {
producerExecutorService.submit(() -> produceMessages(messagesPerProducer));
}
@ -957,7 +1010,7 @@ public class ShareConsumerTest {
consumerExecutorService.submit(() -> {
CompletableFuture<Integer> future = new CompletableFuture<>();
futures.add(future);
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 30, true, future, Optional.of(maxBytes));
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, future, Optional.of(maxBytes));
});
}
@ -990,6 +1043,19 @@ public class ShareConsumerTest {
int messagesPerProducer = 2000;
final int totalMessagesSent = producerCount * messagesPerProducer;
String groupId1 = "group1";
String groupId2 = "group2";
String groupId3 = "group3";
// These consumers are created to register the share group ids with the groupCoordinator
// so that the config share.auto.offset.reset can be altered for these groups
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId1);
alterShareAutoOffsetReset(groupId1, "earliest");
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId2);
alterShareAutoOffsetReset(groupId2, "earliest");
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId3);
alterShareAutoOffsetReset(groupId3, "earliest");
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
ExecutorService shareGroupExecutorService1 = Executors.newFixedThreadPool(consumerCount);
ExecutorService shareGroupExecutorService2 = Executors.newFixedThreadPool(consumerCount);
@ -1095,6 +1161,7 @@ public class ShareConsumerTest {
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
int totalMessages = 1500;
for (int i = 0; i < totalMessages; i++) {
@ -1139,6 +1206,13 @@ public class ShareConsumerTest {
int producerCount = 4;
int messagesPerProducer = 5000;
String groupId = "group1";
// This consumer is created to register the share group id with the groupCoordinator
// so that the config share.auto.offset.reset can be altered for this group
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
alterShareAutoOffsetReset(groupId, "earliest");
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount);
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
@ -1157,7 +1231,7 @@ public class ShareConsumerTest {
// The "failing" consumer polls but immediately closes, which releases the records for the other consumers
CompletableFuture<Integer> future = new CompletableFuture<>();
AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, "group1", 0, 1, false, future);
consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, groupId, 0, 1, false, future);
startSignal.countDown();
});
@ -1174,7 +1248,7 @@ public class ShareConsumerTest {
consumerExecutorService.submit(() -> {
CompletableFuture<Integer> future = new CompletableFuture<>();
futuresSuccess.add(future);
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 40, true, future, Optional.of(maxBytes));
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, future, Optional.of(maxBytes));
});
}
producerExecutorService.shutdown();
@ -1203,6 +1277,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
producer.send(producerRecord1);
@ -1251,6 +1326,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@ -1292,6 +1368,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
alterShareAutoOffsetReset("group1", "earliest");
// The acknowledgment commit callback will try to call a method of KafkaShareConsumer
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
@ -1331,6 +1408,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>());
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@ -1363,6 +1441,7 @@ public class ShareConsumerTest {
public void testPollThrowsInterruptExceptionIfInterrupted(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
// interrupt the thread and call poll
try {
@ -1386,6 +1465,7 @@ public class ShareConsumerTest {
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton("topic abc"));
alterShareAutoOffsetReset("group1", "earliest");
// The exception depends upon a metadata response which arrives asynchronously. If the delay is
// too short, the poll might return before the error is known.
@ -1405,6 +1485,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.wakeup();
assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO));
@ -1423,6 +1504,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
String topic = "foo";
shareConsumer.subscribe(Collections.singleton(topic));
alterShareAutoOffsetReset("group1", "earliest");
// Topic is created post creation of share consumer and subscription
createTopic(topic);
@ -1458,6 +1540,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
// Consumer subscribes to the topics -> bar and baz.
shareConsumer.subscribe(Arrays.asList(topic1, topic2));
alterShareAutoOffsetReset("group1", "earliest");
producer.send(recordTopic1).get();
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@ -1490,6 +1573,13 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
String groupId = "group1";
// This consumer is created to register the share group id with the groupCoordinator
// so that the config share.auto.offset.reset can be altered for this group
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
alterShareAutoOffsetReset(groupId, "earliest");
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
try {
for (int i = 0; i < 10; i++) {
@ -1499,13 +1589,12 @@ public class ShareConsumerTest {
fail("Failed to send records: " + e);
}
Admin adminClient = createAdminClient();
// We delete records before offset 5, so the LSO should move to 5.
adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L)));
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
CompletableFuture<Integer> future = new CompletableFuture<>();
consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future);
consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, future);
// The records returned belong to offsets 5-9.
assertEquals(5, totalMessagesConsumed.get());
try {
@ -1528,7 +1617,7 @@ public class ShareConsumerTest {
totalMessagesConsumed = new AtomicInteger(0);
future = new CompletableFuture<>();
consumeMessages(totalMessagesConsumed, 1, "group1", 1, 10, true, future);
consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, future);
// The record returned belong to offset 14.
assertEquals(1, totalMessagesConsumed.get());
try {
@ -1542,14 +1631,135 @@ public class ShareConsumerTest {
totalMessagesConsumed = new AtomicInteger(0);
future = new CompletableFuture<>();
consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true, future);
consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
assertEquals(0, totalMessagesConsumed.get());
try {
assertEquals(0, future.get());
} catch (Exception e) {
fail("Exception occurred : " + e.getMessage());
}
adminClient.close();
producer.close();
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetDefaultValue(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
// Producing a record.
producer.send(record);
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
// No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record
// was produced before share partition was initialized (which happens after the first share fetch request
// in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already
// present 0th record)
assertEquals(0, records.count());
// Producing another record.
producer.send(record);
records = shareConsumer.poll(Duration.ofMillis(5000));
// Now the next record should be consumed successfully
assertEquals(1, records.count());
shareConsumer.close();
producer.close();
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetEarliest(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
// Changing the value of share.auto.offset.reset value to "earliest"
alterShareAutoOffsetReset("group1", "earliest");
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
// Producing a record.
producer.send(record);
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
// all messages present on the partition
assertEquals(1, records.count());
// Producing another record.
producer.send(record);
records = shareConsumer.poll(Duration.ofMillis(5000));
// The next records should also be consumed successfully
assertEquals(1, records.count());
shareConsumer.close();
producer.close();
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) throws Exception {
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
// Changing the value of share.auto.offset.reset value to "earliest"
alterShareAutoOffsetReset("group1", "earliest");
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
try {
for (int i = 0; i < 10; i++) {
producer.send(record).get();
}
} catch (Exception e) {
fail("Failed to send records: " + e);
}
// We delete records before offset 5, so the LSO should move to 5.
adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L)));
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
CompletableFuture<Integer> future = new CompletableFuture<>();
consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future);
// The records returned belong to offsets 5-9.
assertEquals(5, totalMessagesConsumed.get());
assertEquals(5, future.get());
shareConsumer.close();
producer.close();
}
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
// Changing the value of share.auto.offset.reset value to "earliest" for group1
alterShareAutoOffsetReset("group1", "earliest");
KafkaShareConsumer<byte[], byte[]> shareConsumerLatest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
shareConsumerLatest.subscribe(Collections.singleton(tp.topic()));
// Changing the value of share.auto.offset.reset value to "latest" for group2
alterShareAutoOffsetReset("group2", "latest");
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
// Producing a record.
producer.send(record);
ConsumerRecords<byte[], byte[]> records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
// all messages present on the partition
assertEquals(1, records1.count());
ConsumerRecords<byte[], byte[]> records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
// Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume
// any message
assertEquals(0, records2.count());
// Producing another record.
producer.send(record);
records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
// The next record should also be consumed successfully by group1
assertEquals(1, records1.count());
records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
// The next record should also be consumed successfully by group2
assertEquals(1, records2.count());
shareConsumerEarliest.close();
shareConsumerLatest.close();
producer.close();
}
@ -1729,6 +1939,7 @@ public class ShareConsumerTest {
try {
producer.send(record).get(15000, TimeUnit.MILLISECONDS);
shareConsumer.subscribe(subscription);
alterShareAutoOffsetReset("warmupgroup1", "earliest");
TestUtils.waitForCondition(
() -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 1, 30000, 200L, () -> "warmup record not received");
} finally {
@ -1736,4 +1947,19 @@ public class ShareConsumerTest {
shareConsumer.close();
}
}
private void alterShareAutoOffsetReset(String groupId, String newValue) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), AlterConfigOp.OpType.SET)));
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
try {
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
.all()
.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Exception was thrown: ", e);
}
}
}

View File

@ -74,9 +74,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG}
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.defaultShareAutoOffsetReset.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()

View File

@ -170,14 +170,17 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -235,16 +238,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, 1))
val topicIdPartition3 = new TopicIdPartition(topicId, new TopicPartition(topic, 2))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3)
// Send the first share fetch request to initialize the share partitions
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
produceData(topicIdPartition3, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3)
// Send the share fetch request to fetch the records produced above
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -325,12 +331,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val leader2 = partitionToLeaders(topicIdPartition2)
val leader3 = partitionToLeaders(topicIdPartition3)
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
produceData(topicIdPartition3, 10)
val send1: Seq[TopicIdPartition] = Seq(topicIdPartition1)
val send2: Seq[TopicIdPartition] = Seq(topicIdPartition2)
val send3: Seq[TopicIdPartition] = Seq(topicIdPartition3)
@ -338,14 +338,31 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
// Crete different share fetch requests for different partitions as they may have leaders on separate brokers
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
// Send the first share fetch request to initialize the share partitions
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
val shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
var shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
var shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
var shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
produceData(topicIdPartition3, 10)
// Send the second share fetch request to fetch the records produced above
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@ -427,15 +444,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize share partitions
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -482,7 +502,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
// Sending a second share fetch request to check if acknowledgements were done successfully
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -540,15 +560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch: Int = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -571,7 +594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
// Send a Share Fetch request with piggybacked acknowledgements
// Send the third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -599,7 +622,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
// Sending a third share fetch request to confirm if acknowledgements were done successfully
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -657,15 +680,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partiion
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -709,7 +735,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Sending a second share fetch request to check if acknowledgements were done successfully
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -767,15 +793,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -798,7 +827,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
// Send a Share Fetch request with piggybacked acknowledgements
// Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -862,15 +891,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -917,7 +949,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
// Sending a second share fetch request to check if acknowledgements were done successfully
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -975,15 +1007,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1006,7 +1041,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
// Send a Share Fetch request with piggybacked acknowledgements
// Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -1034,7 +1069,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
// Sending a third share fetch request to confirm if acknowledgements were done successfully
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -1094,15 +1129,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the shar partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1146,7 +1184,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
// Sending a second share fetch request to check if acknowledgements were done successfully
// Sending a third share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -1193,7 +1231,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 new records to the topic
produceData(topicIdPartition, 10)
// Sending a third share fetch request to check if acknowledgements were done successfully
// Sending a fourth share fetch request to check if acknowledgements were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@ -1251,6 +1289,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 3 large messages to the topic created above
produceData(topicIdPartition, 10)
@ -1258,10 +1301,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
produceData(topicIdPartition, "large message 2", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
produceData(topicIdPartition, "large message 3", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1311,6 +1352,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
)
def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = {
val groupId: String = "group"
val memberId = Uuid.randomUuid()
val memberId1 = Uuid.randomUuid()
val memberId2 = Uuid.randomUuid()
val memberId3 = Uuid.randomUuid()
@ -1323,12 +1366,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Sending a dummy share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10000 records to the topic created above
produceData(topicIdPartition, 10000)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds,
// mocking the behaviour of multiple share consumers from the same share group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
@ -1418,23 +1464,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\
sendFirstShareFetchRequest(memberId1, groupId1, send)
sendFirstShareFetchRequest(memberId2, groupId2, send)
sendFirstShareFetchRequest(memberId3, groupId3, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds,
// mocking the behaviour of multiple share consumers from the same share group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
// Sending 3 share Fetch Requests with different groupId and different memberIds to the same topicPartition,
// mocking the behaviour of 3 different share groups
val metadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val metadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val metadata3 = new ShareRequestMetadata(memberId3, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
@ -1509,15 +1560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1540,7 +1594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
// Send a Share Fetch request with piggybacked acknowledgements
// Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -1616,15 +1670,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1647,7 +1704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
// Send a Share Fetch request with piggybacked acknowledgements
// Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -1804,12 +1861,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@ClusterTests(
Array(
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
)
),
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
)
),
)
)
def testShareFetchRequestInvalidShareSessionEpoch(): Unit = {
@ -1824,14 +1897,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1850,8 +1927,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending Share Fetch request with invalid share session epoch
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
// Sending a thord Share Fetch request with invalid share session epoch
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1895,14 +1973,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1922,7 +2004,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending Share Acknowledge request with invalid share session epoch
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
@ -1972,14 +2055,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -1998,8 +2085,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a Share Fetch request with wrong member Id
metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
// Sending a third Share Fetch request with wrong member Id
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -2044,14 +2132,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the share fetch request to fetch the records produced above
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -2071,7 +2163,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a Share Acknowledge request with wrong member Id
metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
@ -2122,15 +2215,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicIdPartition1 = new TopicIdPartition(topicId, new TopicPartition(topic, partition1))
val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, partition2))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
// Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send)
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
// Send the share fetch request to fetch the records produced above
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
// Send the second share fetch request to fetch the records produced above
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -2145,8 +2242,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
// Send the share fetch request to with forget list populated with topicIdPartition2
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
// Send another share fetch request with forget list populated with topicIdPartition2
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
@ -2167,6 +2265,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition]): Unit = {
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
connectAndReceive[ShareFetchResponse](shareFetchRequest)
}
private def expectedAcquiredRecords(firstOffsets: util.List[Long], lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): util.List[AcquiredRecords] = {
val acquiredRecordsList: util.List[AcquiredRecords] = new util.ArrayList()
for (i <- firstOffsets.indices) {

View File

@ -21,8 +21,10 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@ -31,6 +33,8 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Group configuration related parameters and supporting methods like validation, etc. are
@ -48,6 +52,10 @@ public final class GroupConfig extends AbstractConfig {
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms";
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset";
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString();
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset.";
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@ -58,6 +66,8 @@ public final class GroupConfig extends AbstractConfig {
public final int shareRecordLockDurationMs;
public final String shareAutoOffsetReset;
private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@ -88,7 +98,13 @@ public final class GroupConfig extends AbstractConfig {
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT,
atLeast(1000),
MEDIUM,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC);
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
STRING,
SHARE_AUTO_OFFSET_RESET_DEFAULT,
in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
MEDIUM,
SHARE_AUTO_OFFSET_RESET_DOC);
public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
@ -97,6 +113,7 @@ public final class GroupConfig extends AbstractConfig {
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
}
public static ConfigDef configDef() {
@ -203,6 +220,13 @@ public final class GroupConfig extends AbstractConfig {
return new GroupConfig(props);
}
/**
* The default share group auto offset reset strategy.
*/
public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
return ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
}
/**
* The consumer group session timeout in milliseconds.
*/
@ -237,4 +261,20 @@ public final class GroupConfig extends AbstractConfig {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
/**
* The share group auto offset reset strategy.
*/
public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
return ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
}
public enum ShareGroupAutoOffsetReset {
LATEST, EARLIEST;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
@ -27,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -57,8 +59,10 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if (GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
assertPropertyInvalid(name, "hello", "1.0");
} else {
assertPropertyInvalid(name, "not_a_number", "-1");
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
});
}
@ -71,6 +75,21 @@ public class GroupConfigTest {
}
}
@Test
public void testValidShareAutoOffsetResetValues() {
Properties props = createValidGroupConfig();
// Check for value "latest"
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
doTestValidProps(props);
props = createValidGroupConfig();
// Check for value "earliest"
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
doTestValidProps(props);
}
@Test
public void testInvalidProps() {
@ -78,56 +97,65 @@ public class GroupConfigTest {
// Check for invalid consumerSessionTimeoutMs, < MIN
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerSessionTimeoutMs, > MAX
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerHeartbeatIntervalMs, < MIN
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerHeartbeatIntervalMs, > MAX
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareSessionTimeoutMs, < MIN
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareSessionTimeoutMs, > MAX
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareHeartbeatIntervalMs, < MIN
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareHeartbeatIntervalMs, > MAX
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareRecordLockDurationMs, < MIN
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareRecordLockDurationMs, > MAX
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
doTestInvalidProps(props);
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareAutoOffsetReset
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
doTestInvalidProps(props, ConfigException.class);
}
private void doTestInvalidProps(Properties props) {
assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) {
assertThrows(exceptionClassName, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
}
private void doTestValidProps(Properties props) {
assertDoesNotThrow(() -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
}
@Test
@ -138,6 +166,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@ -148,6 +177,7 @@ public class GroupConfigTest {
assertEquals(10, config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
}
@Test
@ -165,6 +195,7 @@ public class GroupConfigTest {
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}

View File

@ -401,7 +401,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return ReadShareGroupStateResponse.toResponseData(
topicId,
partition,
PartitionFactory.DEFAULT_START_OFFSET,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList()
);

View File

@ -53,7 +53,7 @@ public class NoOpShareStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionAllData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList()))
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList()))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
@ -93,7 +93,7 @@ public class NoOpShareStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionStateErrorData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());

View File

@ -26,17 +26,17 @@ import java.util.List;
*/
public class PartitionFactory {
public static final int DEFAULT_STATE_EPOCH = 0;
public static final int DEFAULT_START_OFFSET = 0;
public static final int UNINITIALIZED_START_OFFSET = -1;
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
public static final int DEFAULT_LEADER_EPOCH = 0;
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
public static PartitionIdData newPartitionIdData(int partition) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
}
public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
@ -44,7 +44,7 @@ public class PartitionFactory {
}
public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {