KAFKA-19015: Remove share session from cache on share consumer connection drop (#19329)

Up till now, the share sessions in the broker were only attempted to
evict when the share session cache was full and a new session was trying
to get registered. With the changes in this PR, whenever a share
consumer gets disconnected from the broker, the corresponding share
session would be evicted from the cache.

Note - `connectAndReceiveWithoutClosingSocket` has been introduced in
`GroupCoordinatorBaseRequestTest`. This method creates a socket
connection, sends the request, receives a response but does not close
the connection. Instead, these sockets are stored in a ListBuffer
`openSockets`, which are closed in tearDown method after each test is
run. Also, all the `connectAndReceive` calls in
`ShareFetchAcknowledgeRequestTest` have been replaced by
`connectAndReceiveWithoutClosingSocket`, because these tests depends
upon the persistence of the share sessions on the broker once
registered. But, with the new code introduced, as soon as the socket
connection is closed, a connection drop is assumed by the broker,
leading to session eviction.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-05-01 19:06:18 +05:30 committed by GitHub
parent e3c456ff0f
commit 800612e4a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 467 additions and 152 deletions

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
@ -2057,6 +2058,57 @@ public class ShareConsumerTest {
verifyShareGroupStateTopicRecordsProduced();
}
@ClusterTest(
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "group.share.max.size", value = "3") // Setting max group size to 3
}
)
public void testShareGroupMaxSizeConfigExceeded() throws Exception {
// creating 3 consumers in the group1
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1");
ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1");
ShareConsumer<byte[], byte[]> shareConsumer3 = createShareConsumer("group1");
shareConsumer1.subscribe(Set.of(tp.topic()));
shareConsumer2.subscribe(Set.of(tp.topic()));
shareConsumer3.subscribe(Set.of(tp.topic()));
shareConsumer1.poll(Duration.ofMillis(5000));
shareConsumer2.poll(Duration.ofMillis(5000));
shareConsumer3.poll(Duration.ofMillis(5000));
ShareConsumer<byte[], byte[]> shareConsumer4 = createShareConsumer("group1");
shareConsumer4.subscribe(Set.of(tp.topic()));
TestUtils.waitForCondition(() -> {
try {
shareConsumer4.poll(Duration.ofMillis(5000));
} catch (GroupMaxSizeReachedException e) {
return true;
} catch (Exception e) {
return false;
}
return false;
}, 30000, 200L, () -> "The 4th consumer was not kicked out of the group");
shareConsumer1.close();
shareConsumer2.close();
shareConsumer3.close();
shareConsumer4.close();
}
@ClusterTest
public void testReadCommittedIsolationLevel() {
alterShareAutoOffsetReset("group1", "earliest");

View File

@ -420,11 +420,18 @@ public class SharePartitionManager implements AutoCloseable {
* @param shareFetchData The topic-partitions in the share fetch request.
* @param toForget The topic-partitions to forget present in the share fetch request.
* @param reqMetadata The metadata in the share fetch request.
* @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not
* @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not.
* @param clientConnectionId The client connection id.
* @return The new share fetch context object
*/
public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData,
List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
public ShareFetchContext newContext(
String groupId,
List<TopicIdPartition> shareFetchData,
List<TopicIdPartition> toForget,
ShareRequestMetadata reqMetadata,
Boolean isAcknowledgeDataPresent,
String clientConnectionId
) {
ShareFetchContext context;
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases.
@ -448,7 +455,8 @@ public class SharePartitionManager implements AutoCloseable {
ImplicitLinkedHashCollection<>(shareFetchData.size());
shareFetchData.forEach(topicIdPartition ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, false)));
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), cachedSharePartitions);
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
cachedSharePartitions, clientConnectionId);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();

View File

@ -259,7 +259,13 @@ class BrokerServer(
Optional.of(clientMetricsManager)
)
val connectionDisconnectListeners = Seq(clientMetricsManager.connectionDisconnectListener())
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
val connectionDisconnectListeners = Seq(
clientMetricsManager.connectionDisconnectListener(),
shareFetchSessionCache.connectionDisconnectListener()
)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
@ -426,8 +432,6 @@ class BrokerServer(
))
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
sharePartitionManager = new SharePartitionManager(
replicaManager,
time,

View File

@ -3041,7 +3041,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
// Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here.
shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, request.context.connectionId)
} catch {
case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))

View File

@ -153,6 +153,7 @@ public class SharePartitionManagerTest {
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
private static final String TIMER_NAME_PREFIX = "share-partition-manager";
private static final String CONNECTION_ID = "id-1";
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
@ -200,12 +201,12 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(), reqMetadata2, true);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(), reqMetadata2, true, CONNECTION_ID);
assertEquals(FinalContext.class, context2.getClass());
}
@ -217,7 +218,6 @@ public class SharePartitionManagerTest {
.build();
Uuid tpId0 = Uuid.randomUuid();
Uuid tpId1 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
@ -228,16 +228,15 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH);
// shareFetch is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements.
// New context should be created successfully
List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)));
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData3, List.of(), reqMetadata2, true);
// Sending a Request with FINAL_EPOCH. This should return a FinalContext.
List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, true, CONNECTION_ID);
assertEquals(FinalContext.class, context2.getClass());
}
@ -260,16 +259,16 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH);
// shareFetch is not empty, and it contains tpId1, which should return FinalContext instance since it is FINAL_EPOCH
List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)));
List<TopicIdPartition> reqData2 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)));
assertInstanceOf(FinalContext.class,
sharePartitionManager.newContext(groupId, reqData3, List.of(), reqMetadata2, true));
sharePartitionManager.newContext(groupId, reqData2, List.of(), reqMetadata2, true, CONNECTION_ID));
}
@Test
@ -295,7 +294,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@ -314,16 +313,16 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, "id-2"));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(memberId4, 1), true));
new ShareRequestMetadata(memberId4, 1), true, "id-3"));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@ -341,18 +340,18 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID);
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
assertEquals(100, resp7.throttleTimeMs());
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true);
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@ -389,7 +388,7 @@ public class SharePartitionManagerTest {
String groupId = "grp";
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
@ -405,7 +404,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> removed2 = new ArrayList<>();
removed2.add(tp0);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, removed2,
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
Set<TopicIdPartition> expectedTopicIdPartitions2 = new HashSet<>();
@ -452,7 +451,7 @@ public class SharePartitionManagerTest {
String groupId = "grp";
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
@ -469,7 +468,7 @@ public class SharePartitionManagerTest {
removed2.add(foo0);
removed2.add(foo1);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), removed2,
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
@ -495,14 +494,14 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(foo, bar);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertPartitionsPresent((ShareSessionContext) context1, List.of(foo, bar));
mockUpdateAndGenerateResponseData(context1, groupId, reqMetadata1.memberId());
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(foo),
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID);
// So foo is removed but not the others.
assertPartitionsPresent((ShareSessionContext) context2, List.of(bar));
@ -510,7 +509,7 @@ public class SharePartitionManagerTest {
mockUpdateAndGenerateResponseData(context2, groupId, reqMetadata1.memberId());
ShareFetchContext context3 = sharePartitionManager.newContext(groupId, List.of(), List.of(bar),
new ShareRequestMetadata(reqMetadata1.memberId(), 2), true);
new ShareRequestMetadata(reqMetadata1.memberId(), 2), true, CONNECTION_ID);
assertPartitionsPresent((ShareSessionContext) context3, List.of());
}
@ -537,7 +536,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(foo, bar);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
@ -553,7 +552,7 @@ public class SharePartitionManagerTest {
// Create a subsequent share fetch request as though no topics changed.
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true);
new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertTrue(((ShareSessionContext) context2).isSubsequent());
@ -587,7 +586,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp0, tp1, tpNull1);
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
assertErroneousAndValidTopicIdPartitions(context2.getErroneousAndValidTopicIdPartitions(), List.of(tpNull1), List.of(tp0, tp1));
@ -609,15 +608,15 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID));
// Test trying to create a new session with a non-existent session key
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(Uuid.randomUuid(), 1), true));
new ShareRequestMetadata(Uuid.randomUuid(), 1), true, CONNECTION_ID));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@ -628,12 +627,12 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
List<TopicIdPartition> reqData7 = List.of(tpNull2);
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, reqData7, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID);
// Check for throttled response
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
@ -643,7 +642,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true);
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@ -688,7 +687,7 @@ public class SharePartitionManagerTest {
short version = ApiKeys.SHARE_FETCH.latestVersion();
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@ -708,17 +707,17 @@ public class SharePartitionManagerTest {
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(memberId4, 1), true));
new ShareRequestMetadata(memberId4, 1), true, CONNECTION_ID));
// Continue the first share session we created.
List<TopicIdPartition> reqData5 = List.of(tp2);
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, reqData5, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context5);
assertTrue(((ShareSessionContext) context5).isSubsequent());
@ -733,11 +732,11 @@ public class SharePartitionManagerTest {
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID);
int respSize7 = context7.responseSize(respData2, version);
ShareFetchResponse resp7 = context7.throttleResponse(100);
@ -748,7 +747,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true);
new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context8.getClass());
assertEquals(1, cache.size());
@ -794,7 +793,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
@ -815,7 +814,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> reqData2 = List.of(tp2);
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context2).isSubsequent());
@ -833,7 +832,7 @@ public class SharePartitionManagerTest {
// Continue the first share session we created.
List<TopicIdPartition> reqData3 = List.of(tp2);
ShareFetchContext context3 = sharePartitionManager.newContext(groupId, reqData3, EMPTY_PART_LIST,
new ShareRequestMetadata(shareSessionKey1.memberId(), 1), true);
new ShareRequestMetadata(shareSessionKey1.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context3);
assertTrue(((ShareSessionContext) context3).isSubsequent());
@ -848,7 +847,7 @@ public class SharePartitionManagerTest {
// Continue the second session we created.
List<TopicIdPartition> reqData4 = List.of(tp3);
ShareFetchContext context4 = sharePartitionManager.newContext(groupId, reqData4, List.of(tp2),
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context4);
assertTrue(((ShareSessionContext) context4).isSubsequent());
@ -861,7 +860,7 @@ public class SharePartitionManagerTest {
// Get the final share session.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
new ShareRequestMetadata(reqMetadata1.memberId(), ShareRequestMetadata.FINAL_EPOCH), true);
new ShareRequestMetadata(reqMetadata1.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID);
assertEquals(FinalContext.class, context5.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData5 = new LinkedHashMap<>();
@ -876,7 +875,7 @@ public class SharePartitionManagerTest {
// Continue the second share session .
ShareFetchContext context6 = sharePartitionManager.newContext(groupId, List.of(), List.of(tp3),
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context6);
assertTrue(((ShareSessionContext) context6).isSubsequent());

View File

@ -33,9 +33,11 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import java.net.Socket
import java.util.{Comparator, Properties}
import java.util.stream.Collectors
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@ -46,6 +48,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected var producer: KafkaProducer[String, String] = _
protected var openSockets: ListBuffer[Socket] = ListBuffer[Socket]()
protected def createOffsetsTopic(): Unit = {
val admin = cluster.admin()
try {
@ -140,6 +144,14 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
}
protected def closeSockets(): Unit = {
while (openSockets.nonEmpty) {
val socket = openSockets.head
socket.close()
openSockets.remove(0)
}
}
protected def closeProducer(): Unit = {
if(producer != null)
producer.close()
@ -922,6 +934,23 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
)
}
protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
request: AbstractRequest,
destination: Int
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
request: AbstractRequest
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
private def brokerSocketServer(brokerId: Int): SocketServer = {
getBrokers.find { broker =>
broker.config.brokerId == brokerId

View File

@ -4188,7 +4188,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, shareSessionEpoch), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)))
)
@ -4257,10 +4257,11 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenThrow(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenThrow(
Errors.INVALID_REQUEST.exception()
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2
)))
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@ -4351,7 +4352,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@ -4436,7 +4437,7 @@ class KafkaApisTest extends Logging {
FutureUtils.failedFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@ -4501,9 +4502,10 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@ -4559,9 +4561,10 @@ class KafkaApisTest extends Logging {
cachedSharePartitions.mustAdd(new CachedSharePartition(
new TopicIdPartition(topicId, partitionIndex, topicName), false))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any()))
.thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@ -4615,7 +4618,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@ -4679,7 +4682,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@ -4765,7 +4768,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
@ -4900,14 +4903,14 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, partitionIndex, topicName), false)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
@ -5162,17 +5165,17 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId4, 0, topicName4), false
))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)),
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1)),
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 2), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
).thenReturn(new FinalContext())
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
@ -6127,12 +6130,13 @@ class KafkaApisTest extends Logging {
new TopicIdPartition(topicId, 0, topicName), false
))
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new ShareSessionContext(new ShareRequestMetadata(memberId, 0), util.List.of(
new TopicIdPartition(topicId, partitionIndex, topicName)
))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1),
new ShareSession(new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2)))
).thenReturn(new ShareSessionContext(new ShareRequestMetadata(memberId, 1), new ShareSession(
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
)
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
@ -6329,7 +6333,7 @@ class KafkaApisTest extends Logging {
).asJava)
)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn(
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())).thenReturn(
new FinalContext()
)

View File

@ -43,6 +43,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
@AfterEach
def tearDown(): Unit = {
closeProducer
closeSockets
}
@ClusterTest(
@ -59,7 +60,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
@ -75,7 +76,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, Map.empty)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode)
}
@ -123,9 +124,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// Send the share fetch request to the non-replica and verify the error code
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest, nonReplicaId)
assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs)
val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
assertEquals(leader, partitionData.currentLeader().leaderId())
@ -174,7 +174,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -245,7 +245,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => {
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@ -340,9 +340,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
var shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
var shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
var shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
var shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1)
var shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2)
var shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3)
initProducer()
// Producing 10 records to the topic partitions created above
@ -356,9 +356,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1)
shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2)
shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3)
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@ -451,7 +451,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -478,7 +478,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -500,7 +500,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -566,7 +566,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -595,7 +595,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -620,7 +620,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -684,7 +684,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -711,7 +711,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -730,7 +730,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -794,7 +794,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -839,7 +839,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
releaseAcknowledgementSent = true
}
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -908,7 +908,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -935,7 +935,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -957,7 +957,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1021,7 +1021,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1050,7 +1050,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1075,7 +1075,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1141,7 +1141,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1168,7 +1168,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
var shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
var shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
var shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1187,7 +1187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1213,7 +1213,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records again
shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1235,7 +1235,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1312,9 +1312,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2)
val shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2)
val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponseData1 = shareFetchResponse1.data()
@ -1407,9 +1407,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3)
val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2)
val shareFetchResponse3 = connectAndReceive[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2)
val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3)
val shareFetchResponseData1 = shareFetchResponse1.data()
@ -1487,7 +1487,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1516,7 +1516,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1542,7 +1542,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1595,7 +1595,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1624,7 +1624,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1651,7 +1651,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1711,7 +1711,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
// The response will have a top level error code because this is an Initial Fetch request with acknowledgement data present
@ -1759,7 +1759,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
@ -1809,7 +1809,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1831,7 +1831,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareFetchResponseData.errorCode)
@ -1881,7 +1881,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1908,7 +1908,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
@ -1959,7 +1959,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1981,12 +1981,94 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode)
}
@ClusterTests(
Array(
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"),
new ClusterConfigProperty(key = "group.share.max.size", value="2")
)
),
new ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"),
new ClusterConfigProperty(key = "group.share.max.size", value="2")
)
),
)
)
def testShareSessionEvictedOnConnectionDrop(): Unit = {
val groupId: String = "group"
val memberId1 = Uuid.randomUuid()
val memberId2 = Uuid.randomUuid()
val memberId3 = Uuid.randomUuid()
val topic = "topic"
val partition = 0
createTopicAndReturnLeaders(topic, numPartitions = 3)
val topicIds = getTopicIds.asJava
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// member1 sends share fetch request to register it's share session. Note it does not close the socket connection after.
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000)
// member2 sends share fetch request to register it's share session. Note it does not close the socket connection after.
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000)
// member3 sends share fetch request to register it's share session. Since the maximum number of share sessions that could
// exist in the share session cache is 2 (group.share.max.share.sessions), the attempt to register a third
// share session with the ShareSessionCache would throw SHARE_SESSION_LIMIT_REACHED
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code
}, "Share fetch request failed", 5000)
// Now we will close the socket connections for the above three members, mimicking a client disconnection
closeSockets()
// Since the socket connections were closed before, the corresponding share sessions were dropped from the ShareSessionCache
// on the broker. Now, since the cache is empty, new share sessions can be registered
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000)
}
@ClusterTests(
Array(
new ClusterTest(
@ -2032,7 +2114,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2059,7 +2141,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareAcknowledgeResponseData.errorCode)
@ -2118,7 +2200,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => {
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@ -2144,7 +2226,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2294,7 +2376,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)

View File

@ -587,6 +587,80 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.max.size", value = "2")
))
def testShareGroupMaxSizeConfigExceeded(): Unit = {
val groupId: String = "group"
val memberId1 = Uuid.randomUuid()
val memberId2 = Uuid.randomUuid()
val memberId3 = Uuid.randomUuid()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group by the first member (memberId1).
var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava)
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var shareGroupHeartbeatResponse: ShareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $shareGroupHeartbeatResponse.")
// Heartbeat request to join the group by the second member (memberId2).
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava)
).build()
// Send the request until receiving a successful response
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $shareGroupHeartbeatResponse.")
// Heartbeat request to join the group by the third member (memberId3).
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId3.toString)
.setMemberEpoch(0)
.setSubscribedTopicNames(List("foo").asJava)
).build()
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
// Since the group.share.max.size config is set to 2, a third member cannot join the same group.
assertEquals(shareGroupHeartbeatResponse.data.errorCode, Errors.GROUP_MAX_SIZE_REACHED.code)
} finally {
admin.close()
}
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.network.ConnectionDisconnectListener;
import org.apache.kafka.server.share.CachedSharePartition;
import com.yammer.metrics.core.Meter;
@ -53,10 +54,13 @@ public class ShareSessionCache {
private final int maxEntries;
private long numPartitions = 0;
private final ConnectionDisconnectListener connectionDisconnectListener;
// A map of session key to ShareSession.
private final Map<ShareSessionKey, ShareSession> sessions = new HashMap<>();
private final Map<String, ShareSessionKey> connectionIdToSessionMap;
@SuppressWarnings("this-escape")
public ShareSessionCache(int maxEntries) {
this.maxEntries = maxEntries;
@ -64,6 +68,8 @@ public class ShareSessionCache {
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "ShareSessionCache");
metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
metricsGroup.newGauge(SHARE_PARTITIONS_COUNT, this::totalPartitions);
this.connectionIdToSessionMap = new HashMap<>();
this.connectionDisconnectListener = new ClientConnectionDisconnectListener();
this.evictionsMeter = metricsGroup.newMeter(SHARE_SESSION_EVICTIONS_PER_SEC, "evictions", TimeUnit.SECONDS);
}
@ -123,21 +129,48 @@ public class ShareSessionCache {
* @param groupId - The group id in the share fetch request.
* @param memberId - The member id in the share fetch request.
* @param partitionMap - The topic partitions to be added to the session.
* @param clientConnectionId - The client connection id.
* @return - The session key if the session was created, or null if the session was not created.
*/
public synchronized ShareSessionKey maybeCreateSession(String groupId, Uuid memberId, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) {
public synchronized ShareSessionKey maybeCreateSession(
String groupId,
Uuid memberId,
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
String clientConnectionId
) {
if (sessions.size() < maxEntries) {
ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
updateNumPartitions(session);
connectionIdToSessionMap.put(clientConnectionId, session.key());
return session.key();
}
return null;
}
public ConnectionDisconnectListener connectionDisconnectListener() {
return connectionDisconnectListener;
}
// Visible for testing.
Meter evictionsMeter() {
return evictionsMeter;
}
private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be removed from the cache.
@Override
public void onDisconnect(String connectionId) {
ShareSessionKey shareSessionKey = connectionIdToSessionMap.remove(connectionId);
if (shareSessionKey != null) {
// Remove the session from the cache.
ShareSession removedSession = remove(shareSessionKey);
if (removedSession != null) {
evictionsMeter.mark();
}
}
}
}
}

View File

@ -45,11 +45,11 @@ public class ShareSessionCacheTest {
public void testShareSessionCache() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(3);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10));
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(20));
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(30));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(40)));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5)));
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(20), "conn-2");
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(30), "conn-3");
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(40), "conn-4"));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5), "conn-5"));
assertShareCacheContains(cache, List.of(key1, key2, key3));
assertMetricsValues(3, 60, 0, cache);
@ -60,7 +60,7 @@ public class ShareSessionCacheTest {
ShareSessionCache cache = new ShareSessionCache(2);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(2));
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-1");
assertNotNull(key1);
assertShareCacheContains(cache, List.of(key1));
ShareSession session1 = cache.get(key1);
@ -70,7 +70,7 @@ public class ShareSessionCacheTest {
assertMetricsValues(1, 2, 0, cache);
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(4));
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(4), "conn-2");
assertNotNull(key2);
assertShareCacheContains(cache, List.of(key1, key2));
ShareSession session2 = cache.get(key2);
@ -81,7 +81,7 @@ public class ShareSessionCacheTest {
assertMetricsValues(2, 6, 0, cache);
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5));
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5), "conn-3");
assertNull(key3);
assertShareCacheContains(cache, List.of(key1, key2));
assertEquals(6, cache.totalPartitions());
@ -109,6 +109,36 @@ public class ShareSessionCacheTest {
assertMetricsValues(1, 3, 0, cache);
}
@Test
public void testRemoveConnection() throws InterruptedException {
ShareSessionCache cache = new ShareSessionCache(3);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(1), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-2");
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(3), "conn-3");
assertMetricsValues(3, 6, 0, cache);
// Since cache size is now equal to max entries allowed(3), no new session can be created.
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(40), "conn-4"));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5), "conn-5"));
assertShareCacheContains(cache, List.of(key1, key2, key3));
assertMetricsValues(3, 6, 0, cache);
// Simulating the disconnection of client with connection id conn-1
cache.connectionDisconnectListener().onDisconnect("conn-1");
assertShareCacheContains(cache, List.of(key2, key3));
assertMetricsValues(2, 5, 1, cache);
// Since one client got disconnected, we can add another one now
ShareSessionKey key4 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(4), "conn-6");
assertShareCacheContains(cache, List.of(key2, key3, key4));
assertMetricsValues(3, 9, 1, cache);
}
private ImplicitLinkedHashCollection<CachedSharePartition> mockedSharePartitionMap(int size) {
ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
ImplicitLinkedHashCollection<>(size);