mirror of https://github.com/apache/kafka.git
KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups (#17573)
This PR adds another dynamic config share.auto.offset.reset fir share groups. Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
7e5ffd3403
commit
9db5ed00a8
|
@ -19,9 +19,11 @@ package kafka.server.share;
|
|||
import kafka.cluster.Partition;
|
||||
import kafka.server.ReplicaManager;
|
||||
|
||||
import org.apache.kafka.common.IsolationLevel;
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
|
||||
import org.apache.kafka.common.errors.OffsetNotAvailableException;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
|
@ -40,6 +42,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
|
||||
import scala.Option;
|
||||
import scala.Some;
|
||||
|
||||
/**
|
||||
* Utility class for post-processing of share fetch operations.
|
||||
|
@ -125,7 +128,26 @@ public class ShareFetchUtils {
|
|||
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
|
||||
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
|
||||
Optional.empty(), true).timestampAndOffsetOpt();
|
||||
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
|
||||
if (timestampAndOffset.isEmpty()) {
|
||||
throw new OffsetNotAvailableException("offset for Earliest timestamp not found for topic partition: " + topicIdPartition);
|
||||
}
|
||||
return timestampAndOffset.get().offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* The method is used to get the offset for the latest timestamp for the topic-partition.
|
||||
*
|
||||
* @return The offset for the latest timestamp.
|
||||
*/
|
||||
static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) {
|
||||
// Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests
|
||||
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
|
||||
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
|
||||
Optional.empty(), true).timestampAndOffsetOpt();
|
||||
if (timestampAndOffset.isEmpty()) {
|
||||
throw new OffsetNotAvailableException("offset for Latest timestamp not found for topic partition: " + topicIdPartition);
|
||||
}
|
||||
return timestampAndOffset.get().offset;
|
||||
}
|
||||
|
||||
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
|||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.RecordBatch;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.coordinator.group.GroupConfig;
|
||||
import org.apache.kafka.coordinator.group.GroupConfigManager;
|
||||
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
|
||||
|
@ -72,6 +73,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
|
||||
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
|
||||
|
||||
/**
|
||||
* The SharePartition is used to track the state of a partition that is shared between multiple
|
||||
* consumers. The class maintains the state of the records that have been fetched from the leader
|
||||
|
@ -421,8 +425,12 @@ public class SharePartition {
|
|||
return;
|
||||
}
|
||||
|
||||
// Set the state epoch and end offset from the persisted state.
|
||||
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
|
||||
try {
|
||||
startOffset = startOffsetDuringInitialization(partitionData.startOffset());
|
||||
} catch (Exception e) {
|
||||
completeInitializationWithException(future, e);
|
||||
return;
|
||||
}
|
||||
stateEpoch = partitionData.stateEpoch();
|
||||
|
||||
List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
|
||||
|
@ -448,7 +456,7 @@ public class SharePartition {
|
|||
// and start/end offsets.
|
||||
maybeUpdateCachedStateAndOffsets();
|
||||
} else {
|
||||
updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset());
|
||||
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
|
||||
}
|
||||
// Set the partition state to Active and complete the future.
|
||||
partitionState = SharePartitionState.ACTIVE;
|
||||
|
@ -2058,6 +2066,23 @@ public class SharePartition {
|
|||
}
|
||||
}
|
||||
|
||||
private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
|
||||
// Set the state epoch and end offset from the persisted state.
|
||||
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
|
||||
return partitionDataStartOffset;
|
||||
}
|
||||
GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
|
||||
if (groupConfigManager.groupConfig(groupId).isPresent()) {
|
||||
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
|
||||
} else {
|
||||
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
|
||||
}
|
||||
|
||||
if (offsetResetStrategy == GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
|
||||
return offsetForEarliestTimestamp(topicIdPartition, replicaManager);
|
||||
return offsetForLatestTimestamp(topicIdPartition, replicaManager);
|
||||
}
|
||||
|
||||
// Visible for testing. Should only be used for testing purposes.
|
||||
NavigableMap<Long, InFlightBatch> cachedState() {
|
||||
return new ConcurrentSkipListMap<>(cachedState);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package kafka.server.share;
|
||||
|
||||
import kafka.cluster.Partition;
|
||||
import kafka.log.OffsetResultHolder;
|
||||
import kafka.server.LogReadResult;
|
||||
import kafka.server.ReplicaManager;
|
||||
import kafka.server.ReplicaQuota;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
|
|||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.protocol.ObjectSerializationCache;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.requests.FetchRequest;
|
||||
import org.apache.kafka.common.requests.ShareFetchRequest;
|
||||
|
@ -1040,6 +1042,8 @@ public class SharePartitionManagerTest {
|
|||
partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
|
||||
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
|
||||
|
||||
mockFetchOffsetForTimestamp(mockReplicaManager);
|
||||
|
||||
Time time = mock(Time.class);
|
||||
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
|
||||
Metrics metrics = new Metrics();
|
||||
|
@ -1109,6 +1113,9 @@ public class SharePartitionManagerTest {
|
|||
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
|
||||
|
||||
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
|
||||
|
||||
mockFetchOffsetForTimestamp(mockReplicaManager);
|
||||
|
||||
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
|
||||
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
|
||||
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
|
||||
|
@ -1233,6 +1240,8 @@ public class SharePartitionManagerTest {
|
|||
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
|
||||
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
|
||||
|
||||
mockFetchOffsetForTimestamp(mockReplicaManager);
|
||||
|
||||
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
|
||||
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
|
||||
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
|
||||
|
@ -2482,6 +2491,12 @@ public class SharePartitionManagerTest {
|
|||
});
|
||||
}
|
||||
|
||||
private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
}
|
||||
|
||||
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
|
||||
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
|
||||
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package kafka.server.share;
|
||||
|
||||
import kafka.log.OffsetResultHolder;
|
||||
import kafka.server.ReplicaManager;
|
||||
import kafka.server.share.SharePartition.InFlightState;
|
||||
import kafka.server.share.SharePartition.RecordState;
|
||||
import kafka.server.share.SharePartition.SharePartitionState;
|
||||
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.compress.Compression;
|
||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||
|
@ -34,9 +36,11 @@ import org.apache.kafka.common.errors.UnknownServerException;
|
|||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.record.MemoryRecordsBuilder;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.common.requests.ListOffsetsRequest;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.coordinator.group.GroupConfig;
|
||||
|
@ -77,6 +81,8 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
|
||||
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
|
@ -191,6 +197,244 @@ public class SharePartitionTest {
|
|||
assertNull(sharePartition.cachedState().get(11L).offsetState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertFalse(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP"
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
|
||||
assertEquals(0, sharePartition.startOffset());
|
||||
assertEquals(0, sharePartition.endOffset());
|
||||
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertFalse(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
|
||||
assertEquals(15, sharePartition.startOffset());
|
||||
assertEquals(15, sharePartition.endOffset());
|
||||
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertFalse(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState());
|
||||
assertEquals(15, sharePartition.startOffset());
|
||||
assertEquals(15, sharePartition.endOffset());
|
||||
assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, sharePartition.stateEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeFetchOffsetForLatestTimestampThrowsError() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
|
||||
.thenThrow(new RuntimeException("fetch offsets exception"));
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertTrue(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.LATEST_TIMESTAMP"
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class);
|
||||
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
|
||||
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
|
||||
PartitionFactory.newPartitionAllData(
|
||||
0, PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_ERROR_CODE,
|
||||
PartitionFactory.DEFAULT_ERR_MESSAGE,
|
||||
Collections.emptyList())))));
|
||||
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
|
||||
|
||||
GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
|
||||
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
|
||||
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
|
||||
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
|
||||
.thenThrow(new RuntimeException("fetch offsets exception"));
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder()
|
||||
.withPersister(persister)
|
||||
.withGroupConfigManager(groupConfigManager)
|
||||
.withReplicaManager(replicaManager)
|
||||
.build();
|
||||
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertTrue(result.isCompletedExceptionally());
|
||||
|
||||
// replicaManager.fetchOffsetForTimestamp should be called with "ListOffsetsRequest.EARLIEST_TIMESTAMP"
|
||||
Mockito.verify(replicaManager).fetchOffsetForTimestamp(
|
||||
Mockito.any(TopicPartition.class),
|
||||
Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
|
||||
Mockito.any(),
|
||||
Mockito.any(),
|
||||
Mockito.anyBoolean()
|
||||
);
|
||||
|
||||
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaybeInitializeSharePartitionAgain() {
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
|
@ -460,7 +704,13 @@ public class SharePartitionTest {
|
|||
|
||||
@Test
|
||||
public void testMaybeInitializeWithNoOpShareStatePersister() {
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder().build();
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
|
||||
CompletableFuture<Void> result = sharePartition.maybeInitialize();
|
||||
assertTrue(result.isDone());
|
||||
assertFalse(result.isCompletedExceptionally());
|
||||
|
@ -825,7 +1075,13 @@ public class SharePartitionTest {
|
|||
|
||||
@Test
|
||||
public void testMaybeAcquireAndReleaseFetchLock() {
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder().build();
|
||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
|
||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
||||
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
|
||||
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
|
||||
|
||||
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
|
||||
sharePartition.maybeInitialize();
|
||||
assertTrue(sharePartition.maybeAcquireFetchLock());
|
||||
// Lock cannot be acquired again, as already acquired.
|
||||
|
@ -5233,7 +5489,7 @@ public class SharePartitionTest {
|
|||
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
|
||||
|
||||
private Persister persister = new NoOpShareStatePersister();
|
||||
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||
private SharePartitionState state = SharePartitionState.EMPTY;
|
||||
|
||||
|
@ -5257,6 +5513,11 @@ public class SharePartitionTest {
|
|||
return this;
|
||||
}
|
||||
|
||||
private SharePartitionBuilder withReplicaManager(ReplicaManager replicaManager) {
|
||||
this.replicaManager = replicaManager;
|
||||
return this;
|
||||
}
|
||||
|
||||
private SharePartitionBuilder withGroupConfigManager(GroupConfigManager groupConfigManager) {
|
||||
this.groupConfigManager = groupConfigManager;
|
||||
return this;
|
||||
|
|
|
@ -19,6 +19,9 @@ package kafka.test.api;
|
|||
import kafka.api.BaseConsumerTest;
|
||||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp;
|
||||
import org.apache.kafka.clients.admin.AlterConfigsOptions;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||
|
@ -34,6 +37,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
|
|||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.TopicIdPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.ConfigResource;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.InvalidRecordStateException;
|
||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||
|
@ -47,6 +51,7 @@ import org.apache.kafka.common.serialization.Deserializer;
|
|||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.coordinator.group.GroupConfig;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -60,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -101,6 +107,8 @@ public class ShareConsumerTest {
|
|||
private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister";
|
||||
private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister";
|
||||
|
||||
private Admin adminClient;
|
||||
|
||||
@BeforeEach
|
||||
public void createCluster(TestInfo testInfo) throws Exception {
|
||||
String persisterClassName = NO_OP_PERSISTER;
|
||||
|
@ -131,11 +139,13 @@ public class ShareConsumerTest {
|
|||
cluster.waitForReadyBrokers();
|
||||
createTopic("topic");
|
||||
createTopic("topic2");
|
||||
adminClient = createAdminClient();
|
||||
warmup();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void destroyCluster() throws Exception {
|
||||
adminClient.close();
|
||||
cluster.close();
|
||||
}
|
||||
|
||||
|
@ -156,6 +166,7 @@ public class ShareConsumerTest {
|
|||
Set<String> subscription = Collections.singleton(tp.topic());
|
||||
shareConsumer.subscribe(subscription);
|
||||
assertEquals(subscription, shareConsumer.subscription());
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
|
||||
shareConsumer.close();
|
||||
assertEquals(0, records.count());
|
||||
|
@ -168,6 +179,7 @@ public class ShareConsumerTest {
|
|||
Set<String> subscription = Collections.singleton(tp.topic());
|
||||
shareConsumer.subscribe(subscription);
|
||||
assertEquals(subscription, shareConsumer.subscription());
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
|
||||
shareConsumer.unsubscribe();
|
||||
assertEquals(Collections.emptySet(), shareConsumer.subscription());
|
||||
|
@ -182,6 +194,7 @@ public class ShareConsumerTest {
|
|||
Set<String> subscription = Collections.singleton(tp.topic());
|
||||
shareConsumer.subscribe(subscription);
|
||||
assertEquals(subscription, shareConsumer.subscription());
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
|
||||
assertEquals(0, records.count());
|
||||
shareConsumer.subscribe(subscription);
|
||||
|
@ -198,6 +211,7 @@ public class ShareConsumerTest {
|
|||
Set<String> subscription = Collections.singleton(tp.topic());
|
||||
shareConsumer.subscribe(subscription);
|
||||
assertEquals(subscription, shareConsumer.subscription());
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
|
||||
shareConsumer.unsubscribe();
|
||||
assertEquals(Collections.emptySet(), shareConsumer.subscription());
|
||||
|
@ -214,6 +228,7 @@ public class ShareConsumerTest {
|
|||
Set<String> subscription = Collections.singleton(tp.topic());
|
||||
shareConsumer.subscribe(subscription);
|
||||
assertEquals(subscription, shareConsumer.subscription());
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(500));
|
||||
shareConsumer.subscribe(Collections.emptySet());
|
||||
assertEquals(Collections.emptySet(), shareConsumer.subscription());
|
||||
|
@ -231,6 +246,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
shareConsumer.close();
|
||||
|
@ -245,6 +261,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
producer.send(record);
|
||||
|
@ -273,6 +290,8 @@ public class ShareConsumerTest {
|
|||
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
|
||||
|
@ -307,6 +326,8 @@ public class ShareConsumerTest {
|
|||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
// Now in the second poll, we implicitly acknowledge the record received in the first poll.
|
||||
|
@ -334,6 +355,8 @@ public class ShareConsumerTest {
|
|||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
|
||||
|
@ -361,6 +384,8 @@ public class ShareConsumerTest {
|
|||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
|
||||
|
@ -420,6 +445,7 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
|
||||
assertEquals(numRecords, records.size());
|
||||
|
@ -442,6 +468,7 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
|
||||
assertEquals(numRecords, records.size());
|
||||
|
@ -468,6 +495,8 @@ public class ShareConsumerTest {
|
|||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
|
||||
"group1", Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
|
||||
long i = 0L;
|
||||
for (ConsumerRecord<byte[], byte[]> record : records) {
|
||||
|
@ -513,6 +542,8 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(4, records.count());
|
||||
assertEquals(transactional1.offset(), records.records(tp).get(0).offset());
|
||||
|
@ -538,6 +569,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
records.forEach(shareConsumer::acknowledge);
|
||||
|
@ -556,6 +588,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
records.forEach(shareConsumer::acknowledge);
|
||||
|
@ -583,6 +616,7 @@ public class ShareConsumerTest {
|
|||
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
|
||||
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
|
||||
|
@ -634,6 +668,7 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
|
||||
Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
|
||||
|
@ -689,6 +724,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
|
||||
|
@ -709,6 +745,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
|
||||
|
@ -727,6 +764,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
|
||||
|
@ -734,7 +772,6 @@ public class ShareConsumerTest {
|
|||
producer.close();
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest(name = "{displayName}.persister={0}")
|
||||
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
|
||||
public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
|
||||
|
@ -743,6 +780,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
|
||||
|
@ -762,6 +800,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
|
||||
|
@ -780,6 +819,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync();
|
||||
|
@ -805,6 +845,7 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
|
||||
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
|
||||
|
@ -843,6 +884,8 @@ public class ShareConsumerTest {
|
|||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
|
||||
"group1", Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(maxPartitionFetchBytes)));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
assertEquals(1, records.count());
|
||||
shareConsumer.close();
|
||||
|
@ -857,9 +900,11 @@ public class ShareConsumerTest {
|
|||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
|
||||
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group2", "earliest");
|
||||
|
||||
// producing 3 records to the topic
|
||||
producer.send(record);
|
||||
|
@ -907,6 +952,7 @@ public class ShareConsumerTest {
|
|||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
int totalMessages = 2000;
|
||||
for (int i = 0; i < totalMessages; i++) {
|
||||
|
@ -943,9 +989,16 @@ public class ShareConsumerTest {
|
|||
int producerCount = 4;
|
||||
int messagesPerProducer = 5000;
|
||||
|
||||
String groupId = "group1";
|
||||
|
||||
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
|
||||
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount);
|
||||
|
||||
// This consumer is created to register the share group id with the groupCoordinator
|
||||
// so that the config share.auto.offset.reset can be altered for this group
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
|
||||
alterShareAutoOffsetReset(groupId, "earliest");
|
||||
|
||||
for (int i = 0; i < producerCount; i++) {
|
||||
producerExecutorService.submit(() -> produceMessages(messagesPerProducer));
|
||||
}
|
||||
|
@ -957,7 +1010,7 @@ public class ShareConsumerTest {
|
|||
consumerExecutorService.submit(() -> {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
futures.add(future);
|
||||
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 30, true, future, Optional.of(maxBytes));
|
||||
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, future, Optional.of(maxBytes));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -990,6 +1043,19 @@ public class ShareConsumerTest {
|
|||
int messagesPerProducer = 2000;
|
||||
final int totalMessagesSent = producerCount * messagesPerProducer;
|
||||
|
||||
String groupId1 = "group1";
|
||||
String groupId2 = "group2";
|
||||
String groupId3 = "group3";
|
||||
|
||||
// These consumers are created to register the share group ids with the groupCoordinator
|
||||
// so that the config share.auto.offset.reset can be altered for these groups
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId1);
|
||||
alterShareAutoOffsetReset(groupId1, "earliest");
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId2);
|
||||
alterShareAutoOffsetReset(groupId2, "earliest");
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId3);
|
||||
alterShareAutoOffsetReset(groupId3, "earliest");
|
||||
|
||||
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
|
||||
ExecutorService shareGroupExecutorService1 = Executors.newFixedThreadPool(consumerCount);
|
||||
ExecutorService shareGroupExecutorService2 = Executors.newFixedThreadPool(consumerCount);
|
||||
|
@ -1095,6 +1161,7 @@ public class ShareConsumerTest {
|
|||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
int totalMessages = 1500;
|
||||
for (int i = 0; i < totalMessages; i++) {
|
||||
|
@ -1139,6 +1206,13 @@ public class ShareConsumerTest {
|
|||
int producerCount = 4;
|
||||
int messagesPerProducer = 5000;
|
||||
|
||||
String groupId = "group1";
|
||||
|
||||
// This consumer is created to register the share group id with the groupCoordinator
|
||||
// so that the config share.auto.offset.reset can be altered for this group
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
|
||||
alterShareAutoOffsetReset(groupId, "earliest");
|
||||
|
||||
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(consumerCount);
|
||||
ExecutorService producerExecutorService = Executors.newFixedThreadPool(producerCount);
|
||||
|
||||
|
@ -1157,7 +1231,7 @@ public class ShareConsumerTest {
|
|||
// The "failing" consumer polls but immediately closes, which releases the records for the other consumers
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
|
||||
consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, "group1", 0, 1, false, future);
|
||||
consumeMessages(failedMessagesConsumed, producerCount * messagesPerProducer, groupId, 0, 1, false, future);
|
||||
startSignal.countDown();
|
||||
});
|
||||
|
||||
|
@ -1174,7 +1248,7 @@ public class ShareConsumerTest {
|
|||
consumerExecutorService.submit(() -> {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
futuresSuccess.add(future);
|
||||
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 40, true, future, Optional.of(maxBytes));
|
||||
consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, future, Optional.of(maxBytes));
|
||||
});
|
||||
}
|
||||
producerExecutorService.shutdown();
|
||||
|
@ -1203,6 +1277,7 @@ public class ShareConsumerTest {
|
|||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
producer.send(producerRecord1);
|
||||
|
||||
|
@ -1251,6 +1326,7 @@ public class ShareConsumerTest {
|
|||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
@ -1292,6 +1368,7 @@ public class ShareConsumerTest {
|
|||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
// The acknowledgment commit callback will try to call a method of KafkaShareConsumer
|
||||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
|
||||
|
@ -1331,6 +1408,7 @@ public class ShareConsumerTest {
|
|||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgeCommitCallbackThrows<>());
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
|
@ -1363,6 +1441,7 @@ public class ShareConsumerTest {
|
|||
public void testPollThrowsInterruptExceptionIfInterrupted(String persister) {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
// interrupt the thread and call poll
|
||||
try {
|
||||
|
@ -1386,6 +1465,7 @@ public class ShareConsumerTest {
|
|||
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton("topic abc"));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
// The exception depends upon a metadata response which arrives asynchronously. If the delay is
|
||||
// too short, the poll might return before the error is known.
|
||||
|
@ -1405,6 +1485,7 @@ public class ShareConsumerTest {
|
|||
producer.send(record);
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
shareConsumer.wakeup();
|
||||
assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO));
|
||||
|
@ -1423,6 +1504,7 @@ public class ShareConsumerTest {
|
|||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
String topic = "foo";
|
||||
shareConsumer.subscribe(Collections.singleton(topic));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
// Topic is created post creation of share consumer and subscription
|
||||
createTopic(topic);
|
||||
|
@ -1458,6 +1540,7 @@ public class ShareConsumerTest {
|
|||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
// Consumer subscribes to the topics -> bar and baz.
|
||||
shareConsumer.subscribe(Arrays.asList(topic1, topic2));
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
producer.send(recordTopic1).get();
|
||||
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
|
||||
|
@ -1490,6 +1573,13 @@ public class ShareConsumerTest {
|
|||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
|
||||
|
||||
String groupId = "group1";
|
||||
|
||||
// This consumer is created to register the share group id with the groupCoordinator
|
||||
// so that the config share.auto.offset.reset can be altered for this group
|
||||
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId);
|
||||
alterShareAutoOffsetReset(groupId, "earliest");
|
||||
|
||||
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
|
||||
try {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -1499,13 +1589,12 @@ public class ShareConsumerTest {
|
|||
fail("Failed to send records: " + e);
|
||||
}
|
||||
|
||||
Admin adminClient = createAdminClient();
|
||||
// We delete records before offset 5, so the LSO should move to 5.
|
||||
adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L)));
|
||||
|
||||
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future);
|
||||
consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, future);
|
||||
// The records returned belong to offsets 5-9.
|
||||
assertEquals(5, totalMessagesConsumed.get());
|
||||
try {
|
||||
|
@ -1528,7 +1617,7 @@ public class ShareConsumerTest {
|
|||
|
||||
totalMessagesConsumed = new AtomicInteger(0);
|
||||
future = new CompletableFuture<>();
|
||||
consumeMessages(totalMessagesConsumed, 1, "group1", 1, 10, true, future);
|
||||
consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, future);
|
||||
// The record returned belong to offset 14.
|
||||
assertEquals(1, totalMessagesConsumed.get());
|
||||
try {
|
||||
|
@ -1542,14 +1631,135 @@ public class ShareConsumerTest {
|
|||
|
||||
totalMessagesConsumed = new AtomicInteger(0);
|
||||
future = new CompletableFuture<>();
|
||||
consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true, future);
|
||||
consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
|
||||
assertEquals(0, totalMessagesConsumed.get());
|
||||
try {
|
||||
assertEquals(0, future.get());
|
||||
} catch (Exception e) {
|
||||
fail("Exception occurred : " + e.getMessage());
|
||||
}
|
||||
adminClient.close();
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "{displayName}.persister={0}")
|
||||
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
|
||||
public void testShareAutoOffsetResetDefaultValue(String persister) {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
|
||||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
// Producing a record.
|
||||
producer.send(record);
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
// No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record
|
||||
// was produced before share partition was initialized (which happens after the first share fetch request
|
||||
// in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already
|
||||
// present 0th record)
|
||||
assertEquals(0, records.count());
|
||||
// Producing another record.
|
||||
producer.send(record);
|
||||
records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
// Now the next record should be consumed successfully
|
||||
assertEquals(1, records.count());
|
||||
shareConsumer.close();
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "{displayName}.persister={0}")
|
||||
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
|
||||
public void testShareAutoOffsetResetEarliest(String persister) {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
// Changing the value of share.auto.offset.reset value to "earliest"
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
|
||||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
// Producing a record.
|
||||
producer.send(record);
|
||||
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
|
||||
// all messages present on the partition
|
||||
assertEquals(1, records.count());
|
||||
// Producing another record.
|
||||
producer.send(record);
|
||||
records = shareConsumer.poll(Duration.ofMillis(5000));
|
||||
// The next records should also be consumed successfully
|
||||
assertEquals(1, records.count());
|
||||
shareConsumer.close();
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "{displayName}.persister={0}")
|
||||
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
|
||||
public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) throws Exception {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumer.subscribe(Collections.singleton(tp.topic()));
|
||||
// Changing the value of share.auto.offset.reset value to "earliest"
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
|
||||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
|
||||
try {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
producer.send(record).get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
fail("Failed to send records: " + e);
|
||||
}
|
||||
|
||||
// We delete records before offset 5, so the LSO should move to 5.
|
||||
adminClient.deleteRecords(Collections.singletonMap(tp, RecordsToDelete.beforeOffset(5L)));
|
||||
|
||||
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, future);
|
||||
// The records returned belong to offsets 5-9.
|
||||
assertEquals(5, totalMessagesConsumed.get());
|
||||
assertEquals(5, future.get());
|
||||
|
||||
shareConsumer.close();
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "{displayName}.persister={0}")
|
||||
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
|
||||
public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
|
||||
shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
|
||||
// Changing the value of share.auto.offset.reset value to "earliest" for group1
|
||||
alterShareAutoOffsetReset("group1", "earliest");
|
||||
|
||||
KafkaShareConsumer<byte[], byte[]> shareConsumerLatest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
|
||||
shareConsumerLatest.subscribe(Collections.singleton(tp.topic()));
|
||||
// Changing the value of share.auto.offset.reset value to "latest" for group2
|
||||
alterShareAutoOffsetReset("group2", "latest");
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
|
||||
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer());
|
||||
// Producing a record.
|
||||
producer.send(record);
|
||||
ConsumerRecords<byte[], byte[]> records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
|
||||
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
|
||||
// all messages present on the partition
|
||||
assertEquals(1, records1.count());
|
||||
|
||||
ConsumerRecords<byte[], byte[]> records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
|
||||
// Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume
|
||||
// any message
|
||||
assertEquals(0, records2.count());
|
||||
|
||||
// Producing another record.
|
||||
producer.send(record);
|
||||
|
||||
records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
|
||||
// The next record should also be consumed successfully by group1
|
||||
assertEquals(1, records1.count());
|
||||
|
||||
records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
|
||||
// The next record should also be consumed successfully by group2
|
||||
assertEquals(1, records2.count());
|
||||
|
||||
shareConsumerEarliest.close();
|
||||
shareConsumerLatest.close();
|
||||
producer.close();
|
||||
}
|
||||
|
||||
|
@ -1729,6 +1939,7 @@ public class ShareConsumerTest {
|
|||
try {
|
||||
producer.send(record).get(15000, TimeUnit.MILLISECONDS);
|
||||
shareConsumer.subscribe(subscription);
|
||||
alterShareAutoOffsetReset("warmupgroup1", "earliest");
|
||||
TestUtils.waitForCondition(
|
||||
() -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 1, 30000, 200L, () -> "warmup record not received");
|
||||
} finally {
|
||||
|
@ -1736,4 +1947,19 @@ public class ShareConsumerTest {
|
|||
shareConsumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void alterShareAutoOffsetReset(String groupId, String newValue) {
|
||||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
|
||||
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
|
||||
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
|
||||
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), AlterConfigOp.OpType.SET)));
|
||||
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
|
||||
try {
|
||||
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
|
||||
.all()
|
||||
.get(60, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
fail("Exception was thrown: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,9 +74,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
|
|||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
|
||||
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
|
||||
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG}
|
||||
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
|
||||
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
|
||||
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
|
||||
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest}
|
||||
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
||||
import org.apache.kafka.metadata.LeaderAndIsr
|
||||
|
@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
|
|||
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.defaultShareAutoOffsetReset.toString)
|
||||
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
|
||||
|
||||
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
|
||||
|
|
|
@ -170,14 +170,17 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
|
||||
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -235,16 +238,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, 1))
|
||||
val topicIdPartition3 = new TopicIdPartition(topicId, new TopicPartition(topic, 2))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3)
|
||||
|
||||
// Send the first share fetch request to initialize the share partitions
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic partitions created above
|
||||
produceData(topicIdPartition1, 10)
|
||||
produceData(topicIdPartition2, 10)
|
||||
produceData(topicIdPartition3, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
|
||||
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -325,12 +331,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val leader2 = partitionToLeaders(topicIdPartition2)
|
||||
val leader3 = partitionToLeaders(topicIdPartition3)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic partitions created above
|
||||
produceData(topicIdPartition1, 10)
|
||||
produceData(topicIdPartition2, 10)
|
||||
produceData(topicIdPartition3, 10)
|
||||
|
||||
val send1: Seq[TopicIdPartition] = Seq(topicIdPartition1)
|
||||
val send2: Seq[TopicIdPartition] = Seq(topicIdPartition2)
|
||||
val send3: Seq[TopicIdPartition] = Seq(topicIdPartition3)
|
||||
|
@ -338,14 +338,31 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
|
||||
// Crete different share fetch requests for different partitions as they may have leaders on separate brokers
|
||||
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
|
||||
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
|
||||
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
|
||||
// Send the first share fetch request to initialize the share partitions
|
||||
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
|
||||
var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
|
||||
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
|
||||
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
|
||||
|
||||
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
|
||||
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
|
||||
val shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
|
||||
var shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
|
||||
var shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
|
||||
var shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic partitions created above
|
||||
produceData(topicIdPartition1, 10)
|
||||
produceData(topicIdPartition2, 10)
|
||||
produceData(topicIdPartition3, 10)
|
||||
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
// Create different share fetch requests for different partitions as they may have leaders on separate brokers
|
||||
shareFetchRequest1 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
|
||||
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
|
||||
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
|
||||
|
||||
shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
|
||||
shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
|
||||
shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
|
||||
|
||||
val shareFetchResponseData1 = shareFetchResponse1.data()
|
||||
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
|
||||
|
@ -427,15 +444,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize share partitions
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -482,7 +502,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Sending a second share fetch request to check if acknowledgements were done successfully
|
||||
// Sending a third share fetch request to check if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -540,15 +560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch: Int = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -571,7 +594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Send a Share Fetch request with piggybacked acknowledgements
|
||||
// Send the third Share Fetch request with piggybacked acknowledgements
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
|
||||
|
@ -599,7 +622,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Sending a third share fetch request to confirm if acknowledgements were done successfully
|
||||
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -657,15 +680,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partiion
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -709,7 +735,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
|
||||
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
|
||||
|
||||
// Sending a second share fetch request to check if acknowledgements were done successfully
|
||||
// Sending a third share fetch request to check if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -767,15 +793,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -798,7 +827,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Send a Share Fetch request with piggybacked acknowledgements
|
||||
// Send a third Share Fetch request with piggybacked acknowledgements
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
|
||||
|
@ -862,15 +891,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -917,7 +949,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Sending a second share fetch request to check if acknowledgements were done successfully
|
||||
// Sending a third share fetch request to check if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -975,15 +1007,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -1006,7 +1041,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Send a Share Fetch request with piggybacked acknowledgements
|
||||
// Send a third Share Fetch request with piggybacked acknowledgements
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
|
||||
|
@ -1034,7 +1069,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Sending a third share fetch request to confirm if acknowledgements were done successfully
|
||||
// Sending a fourth share fetch request to confirm if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -1094,15 +1129,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the shar partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -1146,7 +1184,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
var acknowledgePartitionData = shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
|
||||
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
|
||||
|
||||
// Sending a second share fetch request to check if acknowledgements were done successfully
|
||||
// Sending a third share fetch request to check if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -1193,7 +1231,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 new records to the topic
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Sending a third share fetch request to check if acknowledgements were done successfully
|
||||
// Sending a fourth share fetch request to check if acknowledgements were done successfully
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
|
@ -1251,6 +1289,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 3 large messages to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
@ -1258,10 +1301,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
produceData(topicIdPartition, "large message 2", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
|
||||
produceData(topicIdPartition, "large message 3", new String(new Array[Byte](MAX_PARTITION_BYTES/3)))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
|
||||
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -1311,6 +1352,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
)
|
||||
def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = {
|
||||
val groupId: String = "group"
|
||||
|
||||
val memberId = Uuid.randomUuid()
|
||||
val memberId1 = Uuid.randomUuid()
|
||||
val memberId2 = Uuid.randomUuid()
|
||||
val memberId3 = Uuid.randomUuid()
|
||||
|
@ -1323,12 +1366,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Sending a dummy share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10000 records to the topic created above
|
||||
produceData(topicIdPartition, 10000)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds,
|
||||
// mocking the behaviour of multiple share consumers from the same share group
|
||||
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
|
@ -1418,23 +1464,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\
|
||||
sendFirstShareFetchRequest(memberId1, groupId1, send)
|
||||
sendFirstShareFetchRequest(memberId2, groupId2, send)
|
||||
sendFirstShareFetchRequest(memberId3, groupId3, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds,
|
||||
// mocking the behaviour of multiple share consumers from the same share group
|
||||
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Sending 3 share Fetch Requests with different groupId and different memberIds to the same topicPartition,
|
||||
// mocking the behaviour of 3 different share groups
|
||||
val metadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
|
||||
|
||||
val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
val metadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
|
||||
|
||||
val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
val metadata3 = new ShareRequestMetadata(memberId3, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
|
||||
|
||||
|
@ -1509,15 +1560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -1540,7 +1594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Send a Share Fetch request with piggybacked acknowledgements
|
||||
// Send a third Share Fetch request with piggybacked acknowledgements
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
|
||||
|
@ -1616,15 +1670,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -1647,7 +1704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
// Producing 10 more records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
// Send a Share Fetch request with piggybacked acknowledgements
|
||||
// Send a third Share Fetch request with piggybacked acknowledgements
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
|
||||
|
@ -1804,12 +1861,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
|
||||
}
|
||||
|
||||
@ClusterTest(
|
||||
serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
|
||||
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
|
||||
@ClusterTests(
|
||||
Array(
|
||||
new ClusterTest(
|
||||
serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
|
||||
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
|
||||
)
|
||||
),
|
||||
new ClusterTest(
|
||||
serverProperties = Array(
|
||||
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
|
||||
new ClusterConfigProperty(key = "group.share.enable", value = "true"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
|
||||
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
|
||||
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
|
||||
new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
|
||||
new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"),
|
||||
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
def testShareFetchRequestInvalidShareSessionEpoch(): Unit = {
|
||||
|
@ -1824,14 +1897,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -1850,8 +1927,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
|
||||
compareFetchResponsePartitions(expectedPartitionData, partitionData)
|
||||
|
||||
// Sending Share Fetch request with invalid share session epoch
|
||||
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
|
||||
// Sending a thord Share Fetch request with invalid share session epoch
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -1895,14 +1973,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -1922,7 +2004,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
compareFetchResponsePartitions(expectedPartitionData, partitionData)
|
||||
|
||||
// Sending Share Acknowledge request with invalid share session epoch
|
||||
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
|
||||
Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch()
|
||||
.setFirstOffset(0)
|
||||
|
@ -1972,14 +2055,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -1998,8 +2085,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
|
||||
compareFetchResponsePartitions(expectedPartitionData, partitionData)
|
||||
|
||||
// Sending a Share Fetch request with wrong member Id
|
||||
metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
// Sending a third Share Fetch request with wrong member Id
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -2044,14 +2132,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicId = topicIds.get(topic)
|
||||
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic created above
|
||||
produceData(topicIdPartition, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
|
||||
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
||||
|
@ -2071,7 +2163,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
compareFetchResponsePartitions(expectedPartitionData, partitionData)
|
||||
|
||||
// Sending a Share Acknowledge request with wrong member Id
|
||||
metadata = new ShareRequestMetadata(wrongMemberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
|
||||
Map(topicIdPartition -> List(new ShareAcknowledgeRequestData.AcknowledgementBatch()
|
||||
.setFirstOffset(0)
|
||||
|
@ -2122,15 +2215,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
val topicIdPartition1 = new TopicIdPartition(topicId, new TopicPartition(topic, partition1))
|
||||
val topicIdPartition2 = new TopicIdPartition(topicId, new TopicPartition(topic, partition2))
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
|
||||
|
||||
// Send the first share fetch request to initialize the share partition
|
||||
sendFirstShareFetchRequest(memberId, groupId, send)
|
||||
|
||||
initProducer()
|
||||
// Producing 10 records to the topic partitions created above
|
||||
produceData(topicIdPartition1, 10)
|
||||
produceData(topicIdPartition2, 10)
|
||||
|
||||
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
|
||||
|
||||
// Send the share fetch request to fetch the records produced above
|
||||
var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
// Send the second share fetch request to fetch the records produced above
|
||||
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
|
||||
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
|
||||
var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
|
||||
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -2145,8 +2242,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
produceData(topicIdPartition1, 10)
|
||||
produceData(topicIdPartition2, 10)
|
||||
|
||||
// Send the share fetch request to with forget list populated with topicIdPartition2
|
||||
metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
|
||||
// Send another share fetch request with forget list populated with topicIdPartition2
|
||||
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
|
||||
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
|
||||
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
|
||||
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
|
||||
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
|
@ -2167,6 +2265,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
|
|||
compareFetchResponsePartitions(expectedPartitionData, partitionData)
|
||||
}
|
||||
|
||||
private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition]): Unit = {
|
||||
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
|
||||
val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
|
||||
connectAndReceive[ShareFetchResponse](shareFetchRequest)
|
||||
}
|
||||
|
||||
private def expectedAcquiredRecords(firstOffsets: util.List[Long], lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): util.List[AcquiredRecords] = {
|
||||
val acquiredRecordsList: util.List[AcquiredRecords] = new util.ArrayList()
|
||||
for (i <- firstOffsets.indices) {
|
||||
|
|
|
@ -21,8 +21,10 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -31,6 +33,8 @@ import java.util.Set;
|
|||
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
|
||||
/**
|
||||
* Group configuration related parameters and supporting methods like validation, etc. are
|
||||
|
@ -48,6 +52,10 @@ public final class GroupConfig extends AbstractConfig {
|
|||
|
||||
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms";
|
||||
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset";
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString();
|
||||
public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset.";
|
||||
|
||||
public final int consumerSessionTimeoutMs;
|
||||
|
||||
public final int consumerHeartbeatIntervalMs;
|
||||
|
@ -58,6 +66,8 @@ public final class GroupConfig extends AbstractConfig {
|
|||
|
||||
public final int shareRecordLockDurationMs;
|
||||
|
||||
public final String shareAutoOffsetReset;
|
||||
|
||||
private static final ConfigDef CONFIG = new ConfigDef()
|
||||
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
|
||||
INT,
|
||||
|
@ -88,7 +98,13 @@ public final class GroupConfig extends AbstractConfig {
|
|||
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT,
|
||||
atLeast(1000),
|
||||
MEDIUM,
|
||||
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC);
|
||||
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
|
||||
.define(SHARE_AUTO_OFFSET_RESET_CONFIG,
|
||||
STRING,
|
||||
SHARE_AUTO_OFFSET_RESET_DEFAULT,
|
||||
in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
|
||||
MEDIUM,
|
||||
SHARE_AUTO_OFFSET_RESET_DOC);
|
||||
|
||||
public GroupConfig(Map<?, ?> props) {
|
||||
super(CONFIG, props, false);
|
||||
|
@ -97,6 +113,7 @@ public final class GroupConfig extends AbstractConfig {
|
|||
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
|
||||
this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
|
||||
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
|
||||
}
|
||||
|
||||
public static ConfigDef configDef() {
|
||||
|
@ -203,6 +220,13 @@ public final class GroupConfig extends AbstractConfig {
|
|||
return new GroupConfig(props);
|
||||
}
|
||||
|
||||
/**
|
||||
* The default share group auto offset reset strategy.
|
||||
*/
|
||||
public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
/**
|
||||
* The consumer group session timeout in milliseconds.
|
||||
*/
|
||||
|
@ -237,4 +261,20 @@ public final class GroupConfig extends AbstractConfig {
|
|||
public int shareRecordLockDurationMs() {
|
||||
return shareRecordLockDurationMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The share group auto offset reset strategy.
|
||||
*/
|
||||
public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
public enum ShareGroupAutoOffsetReset {
|
||||
LATEST, EARLIEST;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.coordinator.group;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
|
||||
|
@ -27,6 +28,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
|
@ -57,8 +59,10 @@ public class GroupConfigTest {
|
|||
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
|
||||
} else if (GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
|
||||
} else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "hello", "1.0");
|
||||
} else {
|
||||
assertPropertyInvalid(name, "not_a_number", "-1");
|
||||
assertPropertyInvalid(name, "not_a_number", "-0.1");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -71,6 +75,21 @@ public class GroupConfigTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidShareAutoOffsetResetValues() {
|
||||
|
||||
Properties props = createValidGroupConfig();
|
||||
|
||||
// Check for value "latest"
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
doTestValidProps(props);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for value "earliest"
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
doTestValidProps(props);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidProps() {
|
||||
|
||||
|
@ -78,56 +97,65 @@ public class GroupConfigTest {
|
|||
|
||||
// Check for invalid consumerSessionTimeoutMs, < MIN
|
||||
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid consumerSessionTimeoutMs, > MAX
|
||||
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid consumerHeartbeatIntervalMs, < MIN
|
||||
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid consumerHeartbeatIntervalMs, > MAX
|
||||
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareSessionTimeoutMs, < MIN
|
||||
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareSessionTimeoutMs, > MAX
|
||||
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareHeartbeatIntervalMs, < MIN
|
||||
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareHeartbeatIntervalMs, > MAX
|
||||
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareRecordLockDurationMs, < MIN
|
||||
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareRecordLockDurationMs, > MAX
|
||||
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props);
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid shareAutoOffsetReset
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
}
|
||||
|
||||
private void doTestInvalidProps(Properties props) {
|
||||
assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
|
||||
private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) {
|
||||
assertThrows(exceptionClassName, () -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
|
||||
}
|
||||
|
||||
private void doTestValidProps(Properties props) {
|
||||
assertDoesNotThrow(() -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -138,6 +166,7 @@ public class GroupConfigTest {
|
|||
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
|
||||
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
|
||||
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000");
|
||||
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
|
||||
|
@ -148,6 +177,7 @@ public class GroupConfigTest {
|
|||
assertEquals(10, config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
|
||||
assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
|
||||
assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
|
||||
assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -165,6 +195,7 @@ public class GroupConfigTest {
|
|||
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
|
||||
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
|
||||
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
return props;
|
||||
}
|
||||
|
||||
|
|
|
@ -401,7 +401,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
return ReadShareGroupStateResponse.toResponseData(
|
||||
topicId,
|
||||
partition,
|
||||
PartitionFactory.DEFAULT_START_OFFSET,
|
||||
PartitionFactory.UNINITIALIZED_START_OFFSET,
|
||||
PartitionFactory.DEFAULT_STATE_EPOCH,
|
||||
Collections.emptyList()
|
||||
);
|
||||
|
|
|
@ -53,7 +53,7 @@ public class NoOpShareStatePersister implements Persister {
|
|||
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
|
||||
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
|
||||
map(partitionIdData -> PartitionFactory.newPartitionAllData(
|
||||
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList()))
|
||||
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList()))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
return CompletableFuture.completedFuture(new ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
|
||||
|
@ -93,7 +93,7 @@ public class NoOpShareStatePersister implements Persister {
|
|||
for (TopicData<PartitionIdLeaderEpochData> topicData : reqData.topicsData()) {
|
||||
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
|
||||
map(partitionIdData -> PartitionFactory.newPartitionStateErrorData(
|
||||
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
|
||||
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
|
||||
|
|
|
@ -26,17 +26,17 @@ import java.util.List;
|
|||
*/
|
||||
public class PartitionFactory {
|
||||
public static final int DEFAULT_STATE_EPOCH = 0;
|
||||
public static final int DEFAULT_START_OFFSET = 0;
|
||||
public static final int UNINITIALIZED_START_OFFSET = -1;
|
||||
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
|
||||
public static final int DEFAULT_LEADER_EPOCH = 0;
|
||||
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
|
||||
|
||||
public static PartitionIdData newPartitionIdData(int partition) {
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
|
||||
}
|
||||
|
||||
public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) {
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
|
||||
}
|
||||
|
||||
public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
|
||||
|
@ -44,7 +44,7 @@ public class PartitionFactory {
|
|||
}
|
||||
|
||||
public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
|
||||
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
|
||||
}
|
||||
|
||||
public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {
|
||||
|
|
Loading…
Reference in New Issue