KAFKA-16747: Implemented share sessions and contexts for share fetch requests (#16263)

About

KIP-932 introduces share sessions for share groups. This PR implements share sessions and contexts for incoming share fetch requests on broker. The changes include:

Defined CachedSharePartition class which are stored in share sessions.
Defined ShareSessionKey, ShareSession classes.
Defined ShareSessionCache class which caches all the share sessions and has evict policy defined as per KIP-932

Defined the 2 types of contexts -
a. ShareSessionContext - for share session fetch request.
b. FinalContext - for final share fetch request (epoch = -1).

Defined newContext function which returns the created/updated context on receiving share fetch request on broker.

Testing
The added code has been tested with the help of unit tests present in the PR.

Reviewers:  Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2024-06-14 16:55:27 +05:30 committed by GitHub
parent 46714dbaed
commit 8f6e0513df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 2335 additions and 14 deletions

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Helper class to return the erroneous partitions and valid partition data
*/
public class ErroneousAndValidPartitionData {
private final List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous;
private final List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions;
public ErroneousAndValidPartitionData(List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous,
List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions) {
this.erroneous = erroneous;
this.validTopicIdPartitions = validTopicIdPartitions;
}
public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
erroneous = new ArrayList<>();
validTopicIdPartitions = new ArrayList<>();
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
if (topicIdPartition.topic() == null) {
erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
} else {
validTopicIdPartitions.add(new Tuple2<>(topicIdPartition, sharePartitionData));
}
});
}
public ErroneousAndValidPartitionData() {
this.erroneous = new ArrayList<>();
this.validTopicIdPartitions = new ArrayList<>();
}
public List<Tuple2<TopicIdPartition, ShareFetchResponseData.PartitionData>> erroneous() {
return erroneous;
}
public List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> validTopicIdPartitions() {
return validTopicIdPartitions;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.LinkedHashMap;
/**
* The share fetch context for a final share fetch request.
*/
public class FinalContext extends ShareFetchContext {
private static final Logger log = LoggerFactory.getLogger(FinalContext.class);
public FinalContext() {
}
@Override
boolean isTraceEnabled() {
return log.isTraceEnabled();
}
@Override
int responseSize(LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates, short version) {
return ShareFetchResponse.sizeOf(version, updates.entrySet().iterator());
}
@Override
ShareFetchResponse updateAndGenerateResponseData(String groupId, Uuid memberId,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
log.debug("Final context returning {}", partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
updates.entrySet().iterator(), Collections.emptyList()));
}
@Override
ErroneousAndValidPartitionData getErroneousAndValidTopicIdPartitions() {
return new ErroneousAndValidPartitionData();
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import kafka.server.FetchSession;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchResponse;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
/**
* The context for every share fetch request. The context is responsible for tracking the topic partitions present in
* the share fetch request and generating the response data.
*/
public abstract class ShareFetchContext {
/**
*
* @param partitions - The partitions requested in the fetch request.
* @return - A string representation of the partitions requested.
*/
String partitionsToLogString(Collection<TopicIdPartition> partitions) {
return FetchSession.partitionsToLogString(partitions, isTraceEnabled());
}
/**
* Return an empty throttled response due to quota violation.
* @param throttleTimeMs - The time to throttle the response.
* @return - An empty throttled response.
*/
ShareFetchResponse throttleResponse(int throttleTimeMs) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
}
/**
* @return - Whether trace logging is enabled.
*/
abstract boolean isTraceEnabled();
/**
* Get the response size to be used for quota computation. Since we are returning an empty response in case of
* throttling, we are not supposed to update the context until we know that we are not going to throttle.
* @param updates - The updates to be sent in the response.
* @param version - The version of the share fetch request.
* @return - The size of the response.
*/
abstract int responseSize(LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates,
short version);
/**
* Updates the share fetch context with new partition information. Generates response data.
* The response data may require subsequent down-conversion.
* @param groupId - The group id.
* @param memberId - The member id.
* @param updates - The updates to be sent in the response.
* @return - The share fetch response.
*/
abstract ShareFetchResponse updateAndGenerateResponseData(String groupId, Uuid memberId, LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates);
/**
* @return - The error-prone and valid topic id partitions in the share fetch request.
*/
abstract ErroneousAndValidPartitionData getErroneousAndValidTopicIdPartitions();
}

View File

@ -16,18 +16,34 @@
*/
package kafka.server.share;
import kafka.server.FetchSession;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -193,6 +209,109 @@ public class SharePartitionManager implements AutoCloseable {
return future;
}
/**
* The newContext method is used to create a new share fetch context for every share fetch request.
* @param groupId The group id in the share fetch request.
* @param shareFetchData The topic-partitions and their corresponding maxBytes data in the share fetch request.
* @param toForget The topic-partitions to forget present in the share fetch request.
* @param reqMetadata The metadata in the share fetch request.
* @return The new share fetch context object
*/
public ShareFetchContext newContext(String groupId, Map<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) {
ShareFetchContext context;
// TopicPartition with maxBytes as 0 should not be added in the cachedPartitions
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>();
shareFetchData.forEach((tp, sharePartitionData) -> {
if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
});
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases.
if (reqMetadata.isFull()) {
ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId());
if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) {
// If the epoch is FINAL_EPOCH, don't try to create a new session.
if (!shareFetchDataWithMaxBytes.isEmpty()) {
throw Errors.INVALID_REQUEST.exception();
}
context = new FinalContext();
synchronized (cache) {
if (cache.remove(key) != null) {
log.debug("Removed share session with key {}", key);
}
}
} else {
if (cache.remove(key) != null) {
log.debug("Removed share session with key {}", key);
}
ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new
ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size());
shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false)));
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
time.milliseconds(), cachedSharePartitions);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
}
context = new ShareSessionContext(reqMetadata, shareFetchDataWithMaxBytes);
log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share " +
"session will be started.", responseShareSessionKey, false,
partitionsToLogString(shareFetchDataWithMaxBytes.keySet()));
}
} else {
// We update the already existing share session.
synchronized (cache) {
ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId());
ShareSession shareSession = cache.get(key);
if (shareSession == null) {
log.error("Share session error for {}: no such share session found", key);
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
}
if (shareSession.epoch != reqMetadata.epoch()) {
log.debug("Share session error for {}: expected epoch {}, but got {} instead", key,
shareSession.epoch, reqMetadata.epoch());
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
}
Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
shareFetchDataWithMaxBytes, toForget);
cache.touch(shareSession, time.milliseconds());
shareSession.epoch = ShareFetchMetadata.nextEpoch(shareSession.epoch);
log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " +
"added {}, updated {}, removed {}", shareSession.key(), shareSession.epoch,
partitionsToLogString(modifiedTopicIdPartitions.get(
ShareSession.ModifiedTopicIdPartitionType.ADDED)),
partitionsToLogString(modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)),
partitionsToLogString(modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED))
);
context = new ShareSessionContext(reqMetadata, shareSession);
}
}
return context;
}
private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
return new ShareSessionKey(groupId, memberId);
}
private static String partitionsToLogString(Collection<TopicIdPartition> partitions) {
return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled());
}
public List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String groupId, Uuid memberId) {
ShareSessionKey key = shareSessionKey(groupId, memberId);
ShareSession shareSession = cache.get(key);
if (shareSession == null) {
throw new ShareSessionNotFoundException("Share session not found in cache");
}
List<TopicIdPartition> cachedTopicIdPartitions = new ArrayList<>();
shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add(
new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()
))));
return cachedTopicIdPartitions;
}
@Override
public void close() throws Exception {
// TODO: Provide Implementation
@ -203,22 +322,32 @@ public class SharePartitionManager implements AutoCloseable {
* share group id, the topic id and the partition id. The key is used to store the SharePartition
* objects in the partition cache map.
*/
private static class SharePartitionKey {
// Visible for testing
static class SharePartitionKey {
private final String groupId;
private final TopicIdPartition topicIdPartition;
}
public SharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
this.groupId = Objects.requireNonNull(groupId);
this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
}
/**
* Caches share sessions.
* <p>
* See tryEvict for an explanation of the cache eviction strategy.
* <p>
* The ShareSessionCache is thread-safe because all of its methods are synchronized.
* Note that individual share sessions have their own locks which are separate from the
* ShareSessionCache lock. In order to avoid deadlock, the ShareSessionCache lock
* must never be acquired while an individual ShareSession lock is already held.
*/
public static class ShareSessionCache {
// TODO: Provide Implementation
@Override
public int hashCode() {
return Objects.hash(groupId, topicIdPartition);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
else if (obj == null || getClass() != obj.getClass())
return false;
else {
SharePartitionKey that = (SharePartitionKey) obj;
return groupId.equals(that.groupId) && Objects.equals(topicIdPartition, that.topicIdPartition);
}
}
}
/**

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest.SharePartitionData;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
/**
* The context for a share session fetch request.
*/
public class ShareSessionContext extends ShareFetchContext {
private static final Logger log = LoggerFactory.getLogger(ShareSessionContext.class);
private final ShareFetchMetadata reqMetadata;
private final boolean isSubsequent;
private Map<TopicIdPartition, SharePartitionData> shareFetchData;
private ShareSession session;
/**
* The share fetch context for the first request that starts a share session.
*
* @param reqMetadata The request metadata.
* @param shareFetchData The share partition data from the share fetch request.
*/
public ShareSessionContext(ShareFetchMetadata reqMetadata,
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) {
this.reqMetadata = reqMetadata;
this.shareFetchData = shareFetchData;
this.isSubsequent = false;
}
/**
* The share fetch context for a subsequent request that utilizes an existing share session.
*
* @param reqMetadata The request metadata.
* @param session The subsequent fetch request session.
*/
public ShareSessionContext(ShareFetchMetadata reqMetadata, ShareSession session) {
this.reqMetadata = reqMetadata;
this.session = session;
this.isSubsequent = true;
}
// Visible for testing
public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData() {
return shareFetchData;
}
// Visible for testing
public boolean isSubsequent() {
return isSubsequent;
}
// Visible for testing
public ShareSession session() {
return session;
}
@Override
boolean isTraceEnabled() {
return log.isTraceEnabled();
}
@Override
ShareFetchResponse throttleResponse(int throttleTimeMs) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
}
int expectedEpoch = ShareFetchMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
synchronized (session) {
sessionEpoch = session.epoch;
}
if (sessionEpoch != expectedEpoch) {
log.debug("Subsequent share session {} expected epoch {}, but got {}. " +
"Possible duplicate request.", session.key(), expectedEpoch, sessionEpoch);
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
throttleTimeMs, Collections.emptyIterator(), Collections.emptyList()));
}
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
Collections.emptyIterator(), Collections.emptyList()));
}
/**
* Iterator that goes over the given partition map and selects partitions that need to be included in the response.
* If updateShareContextAndRemoveUnselected is set to true, the share context will be updated for the selected
* partitions and also remove unselected ones as they are encountered.
*/
private class PartitionIterator implements Iterator<Entry<TopicIdPartition, PartitionData>> {
private final Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> iterator;
private final boolean updateShareContextAndRemoveUnselected;
private Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> nextElement;
public PartitionIterator(Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> iterator, boolean updateShareContextAndRemoveUnselected) {
this.iterator = iterator;
this.updateShareContextAndRemoveUnselected = updateShareContextAndRemoveUnselected;
}
@Override
public boolean hasNext() {
while ((nextElement == null) && iterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> element = iterator.next();
TopicIdPartition topicPart = element.getKey();
ShareFetchResponseData.PartitionData respData = element.getValue();
synchronized (session) {
CachedSharePartition cachedPart = session.partitionMap().find(new CachedSharePartition(topicPart));
boolean mustRespond = cachedPart.maybeUpdateResponseData(respData, updateShareContextAndRemoveUnselected);
if (mustRespond) {
nextElement = element;
if (updateShareContextAndRemoveUnselected && ShareFetchResponse.recordsSize(respData) > 0) {
// Session.partitionMap is of type ImplicitLinkedHashCollection<> which tracks the order of insertion of elements.
// Since, we are updating an element in this case, we need to perform a remove and then a mustAdd to maintain the correct order
session.partitionMap().remove(cachedPart);
session.partitionMap().mustAdd(cachedPart);
}
} else {
if (updateShareContextAndRemoveUnselected) {
iterator.remove();
}
}
}
}
return nextElement != null;
}
@Override
public Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> next() {
if (!hasNext()) throw new NoSuchElementException();
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> element = nextElement;
nextElement = null;
return element;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@Override
int responseSize(LinkedHashMap<TopicIdPartition, PartitionData> updates, short version) {
if (!isSubsequent)
return ShareFetchResponse.sizeOf(version, updates.entrySet().iterator());
synchronized (session) {
int expectedEpoch = ShareFetchMetadata.nextEpoch(reqMetadata.epoch());
if (session.epoch != expectedEpoch) {
return ShareFetchResponse.sizeOf(version, Collections.emptyIterator());
}
// Pass the partition iterator which updates neither the share fetch context nor the partition map.
return ShareFetchResponse.sizeOf(version, new PartitionIterator(updates.entrySet().iterator(), false));
}
}
@Override
ShareFetchResponse updateAndGenerateResponseData(String groupId, Uuid memberId,
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(
Errors.NONE, 0, updates.entrySet().iterator(), Collections.emptyList()));
} else {
int expectedEpoch = ShareFetchMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
synchronized (session) {
sessionEpoch = session.epoch;
}
if (sessionEpoch != expectedEpoch) {
log.debug("Subsequent share session {} expected epoch {}, but got {}. Possible duplicate request.",
session.key(), expectedEpoch, sessionEpoch);
return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
0, Collections.emptyIterator(), Collections.emptyList()));
}
// Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> partitionIterator = new PartitionIterator(
updates.entrySet().iterator(), true);
while (partitionIterator.hasNext()) {
partitionIterator.next();
}
log.debug("Subsequent share session context with session key {} returning {}", session.key(),
partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(
Errors.NONE, 0, updates.entrySet().iterator(), Collections.emptyList()));
}
}
@Override
ErroneousAndValidPartitionData getErroneousAndValidTopicIdPartitions() {
if (!isSubsequent) {
return new ErroneousAndValidPartitionData(shareFetchData);
}
List<Tuple2<TopicIdPartition, PartitionData>> erroneous = new ArrayList<>();
List<Tuple2<TopicIdPartition, ShareFetchRequest.SharePartitionData>> valid = new ArrayList<>();
// Take the session lock and iterate over all the cached partitions.
synchronized (session) {
session.partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
if (topicIdPartition.topic() == null) {
erroneous.add(new Tuple2<>(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)));
} else {
valid.add(new Tuple2<>(topicIdPartition, reqData));
}
});
return new ErroneousAndValidPartitionData(erroneous, valid);
}
}
}

View File

@ -0,0 +1,936 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@Timeout(120)
public class SharePartitionManagerTest {
private static final int RECORD_LOCK_DURATION_MS = 30000;
private static final int MAX_DELIVERY_COUNT = 5;
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>());
@Test
public void testNewContextReturnsFinalContext() {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
ShareFetchMetadata newReqMetadata = new ShareFetchMetadata(Uuid.ZERO_UUID, -1);
ShareFetchContext shareFetchContext = sharePartitionManager.newContext("grp", Collections.emptyMap(), Collections.emptyList(), newReqMetadata);
assertEquals(FinalContext.class, shareFetchContext.getClass());
// If the final fetch request has topics to add, it should fail as an invalid request
Uuid topicId = Uuid.randomUuid();
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = Collections.singletonMap(
new TopicIdPartition(topicId, new TopicPartition("foo", 0)),
new ShareFetchRequest.SharePartitionData(topicId, 4000));
assertThrows(InvalidRequestException.class,
() -> sharePartitionManager.newContext("grp", shareFetchData, Collections.emptyList(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1)));
// shareFetchData is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements.
// New context should be created successfully
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData2 = Collections.singletonMap(new TopicIdPartition(topicId, new TopicPartition("foo", 0)),
new ShareFetchRequest.SharePartitionData(topicId, 0));
shareFetchContext = sharePartitionManager.newContext("grp", shareFetchData2, Collections.emptyList(), newReqMetadata);
assertEquals(FinalContext.class, shareFetchContext.getClass());
}
@Test
public void testNewContextReturnsFinalContextForEmptyTopicPartitionsAndFinalEpoch() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build();
String groupId = "grp";
// Verify that final epoch requests get a FinalContext
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.FINAL_EPOCH));
assertEquals(FinalContext.class, context1.getClass());
}
@Test
public void testNewContext() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid tpId0 = Uuid.randomUuid();
Uuid tpId1 = Uuid.randomUuid();
topicNames.put(tpId0, "foo");
topicNames.put(tpId1, "bar");
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
String groupId = "grp";
// Create a new share session with an initial share fetch request
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<>();
reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
ShareFetchMetadata reqMetadata2 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2);
assertEquals(ShareSessionContext.class, context2.getClass());
assertFalse(((ShareSessionContext) context2).isSubsequent());
((ShareSessionContext) context2).shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
assertTrue(reqData2.containsKey(topicIdPartition));
assertEquals(reqData2.get(topicIdPartition), sharePartitionData);
});
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
assertEquals(respData2, resp2.responseData(topicNames));
ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId,
reqMetadata2.memberId());
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(memberId4, 1)));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 1));
assertEquals(ShareSessionContext.class, context5.getClass());
assertTrue(((ShareSessionContext) context5).isSubsequent());
ShareSessionContext shareSessionContext5 = (ShareSessionContext) context5;
synchronized (shareSessionContext5.session()) {
shareSessionContext5.session().partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
ShareFetchRequest.SharePartitionData data = cachedSharePartition.reqData();
assertTrue(reqData2.containsKey(topicIdPartition));
assertEquals(reqData2.get(topicIdPartition), data);
});
}
ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp5.error());
assertEquals(0, resp5.responseData(topicNames).size());
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 2));
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
assertEquals(100, resp7.throttleTimeMs());
// Close the subsequent share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata2.memberId(), ShareFetchMetadata.FINAL_EPOCH));
assertEquals(FinalContext.class, context8.getClass());
assertEquals(0, cache.size());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<>();
respData8.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
assertEquals(Errors.NONE, resp8.error());
}
@Test
public void testShareSessionExpiration() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(2, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid fooId = Uuid.randomUuid();
topicNames.put(fooId, "foo");
TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
// Create a new share session, session 1
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> session1req = new LinkedHashMap<>();
session1req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
session1req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
String groupId = "grp";
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext session1context = sharePartitionManager.newContext(groupId, session1req, EMPTY_PART_LIST, reqMetadata1);
assertEquals(session1context.getClass(), ShareSessionContext.class);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
ShareFetchResponse session1resp = session1context.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, session1resp.error());
assertEquals(2, session1resp.responseData(topicNames).size());
ShareSessionKey session1Key = new ShareSessionKey(groupId, reqMetadata1.memberId());
// check share session entered into cache
assertNotNull(cache.get(session1Key));
time.sleep(500);
// Create a second new share session
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> session2req = new LinkedHashMap<>();
session2req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
session2req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
ShareFetchMetadata reqMetadata2 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext session2context = sharePartitionManager.newContext(groupId, session2req, EMPTY_PART_LIST, reqMetadata2);
assertEquals(session2context.getClass(), ShareSessionContext.class);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData2.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
ShareFetchResponse session2resp = session2context.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, session2resp.error());
assertEquals(2, session2resp.responseData(topicNames).size());
ShareSessionKey session2Key = new ShareSessionKey(groupId, reqMetadata2.memberId());
// both newly created entries are present in cache
assertNotNull(cache.get(session1Key));
assertNotNull(cache.get(session2Key));
time.sleep(500);
// Create a subsequent share fetch context for session 1
ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata1.memberId(), 1));
assertEquals(session1context2.getClass(), ShareSessionContext.class);
// total sleep time will now be large enough that share session 1 will be evicted if not correctly touched
time.sleep(501);
// create one final share session to test that the least recently used entry is evicted
// the second share session should be evicted because the first share session was incrementally fetched
// more recently than the second session was created
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> session3req = new LinkedHashMap<>();
session3req.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
session3req.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
ShareFetchMetadata reqMetadata3 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext session3context = sharePartitionManager.newContext(groupId, session3req, EMPTY_PART_LIST, reqMetadata3);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<>();
respData3.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData3.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
ShareFetchResponse session3resp = session3context.updateAndGenerateResponseData(groupId, reqMetadata3.memberId(), respData3);
assertEquals(Errors.NONE, session3resp.error());
assertEquals(2, session3resp.responseData(topicNames).size());
ShareSessionKey session3Key = new ShareSessionKey(groupId, reqMetadata3.memberId());
assertNotNull(cache.get(session1Key));
assertNull(cache.get(session2Key), "share session 2 should have been evicted by latest share session, " +
"as share session 1 was used more recently");
assertNotNull(cache.get(session3Key));
}
@Test
public void testSubsequentShareSession() {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid fooId = Uuid.randomUuid();
Uuid barId = Uuid.randomUuid();
topicNames.put(fooId, "foo");
topicNames.put(barId, "bar");
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
// Create a new share session with foo-0 and foo-1
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>();
reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
String groupId = "grp";
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1);
assertEquals(ShareSessionContext.class, context1.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp0.partition()));
respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, resp1.error());
assertEquals(2, resp1.responseData(topicNames).size());
// Create a subsequent fetch request that removes foo-0 and adds bar-0
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = Collections.singletonMap(
tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
List<TopicIdPartition> removed2 = new ArrayList<>();
removed2.add(tp0);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, removed2,
new ShareFetchMetadata(reqMetadata1.memberId(), 1));
assertEquals(ShareSessionContext.class, context2.getClass());
Set<TopicIdPartition> expectedTopicIdPartitions2 = new HashSet<>();
expectedTopicIdPartitions2.add(tp1);
expectedTopicIdPartitions2.add(tp2);
Set<TopicIdPartition> actualTopicIdPartitions2 = new HashSet<>();
ShareSessionContext shareSessionContext = (ShareSessionContext) context2;
shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
actualTopicIdPartitions2.add(topicIdPartition);
});
assertEquals(expectedTopicIdPartitions2, actualTopicIdPartitions2);
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp2.partition()));
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
assertEquals(1, resp2.data().responses().size());
assertEquals(barId, resp2.data().responses().get(0).topicId());
assertEquals(1, resp2.data().responses().get(0).partitions().size());
assertEquals(0, resp2.data().responses().get(0).partitions().get(0).partitionIndex());
assertEquals(1, resp2.responseData(topicNames).size());
}
@Test
public void testZeroSizeShareSession() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).build();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid fooId = Uuid.randomUuid();
topicNames.put(fooId, "foo");
TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
// Create a new share session with foo-0 and foo-1
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>();
reqData1.put(foo0, new ShareFetchRequest.SharePartitionData(foo0.topicId(), 100));
reqData1.put(foo1, new ShareFetchRequest.SharePartitionData(foo1.topicId(), 100));
String groupId = "grp";
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1);
assertEquals(ShareSessionContext.class, context1.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, resp1.error());
assertEquals(2, resp1.responseData(topicNames).size());
// Create a subsequent share request that removes foo-0 and foo-1
// Verify that the previous share session was closed.
List<TopicIdPartition> removed2 = new ArrayList<>();
removed2.add(foo0);
removed2.add(foo1);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), removed2,
new ShareFetchMetadata(reqMetadata1.memberId(), 1));
assertEquals(ShareSessionContext.class, context2.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
assertTrue(resp2.responseData(topicNames).isEmpty());
assertEquals(1, cache.size());
}
@Test
public void testToForgetPartitions() {
String groupId = "grp";
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).build();
Uuid fooId = Uuid.randomUuid();
Uuid barId = Uuid.randomUuid();
TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 0));
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>();
reqData1.put(foo, new ShareFetchRequest.SharePartitionData(foo.topicId(), 100));
reqData1.put(bar, new ShareFetchRequest.SharePartitionData(bar.topicId(), 100));
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1);
assertEquals(ShareSessionContext.class, context1.getClass());
assertPartitionsPresent((ShareSessionContext) context1, Arrays.asList(foo, bar));
mockUpdateAndGenerateResponseData(context1, groupId, reqMetadata1.memberId());
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(foo),
new ShareFetchMetadata(reqMetadata1.memberId(), 1));
// So foo is removed but not the others.
assertPartitionsPresent((ShareSessionContext) context2, Collections.singletonList(bar));
mockUpdateAndGenerateResponseData(context2, groupId, reqMetadata1.memberId());
ShareFetchContext context3 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(bar),
new ShareFetchMetadata(reqMetadata1.memberId(), 2));
assertPartitionsPresent((ShareSessionContext) context3, Collections.emptyList());
}
// This test simulates a share session where the topic ID changes broker side (the one handling the request) in both the metadata cache and the log
// -- as though the topic is deleted and recreated.
@Test
public void testShareSessionUpdateTopicIdsBrokerSide() {
String groupId = "grp";
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).build();
Uuid fooId = Uuid.randomUuid();
Uuid barId = Uuid.randomUuid();
TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 1));
Map<Uuid, String> topicNames = new HashMap<>();
topicNames.put(fooId, "foo");
topicNames.put(barId, "bar");
// Create a new share session with foo-0 and bar-1
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>();
reqData1.put(foo, new ShareFetchRequest.SharePartitionData(foo.topicId(), 100));
reqData1.put(bar, new ShareFetchRequest.SharePartitionData(bar.topicId(), 100));
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1);
assertEquals(ShareSessionContext.class, context1.getClass());
assertFalse(((ShareSessionContext) context1).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(bar, new ShareFetchResponseData.PartitionData().setPartitionIndex(bar.partition()));
respData1.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(
Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, resp1.error());
assertEquals(2, resp1.responseData(topicNames).size());
// Create a subsequent share fetch request as though no topics changed.
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata1.memberId(), 1));
assertEquals(ShareSessionContext.class, context2.getClass());
assertTrue(((ShareSessionContext) context2).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
// Likely if the topic ID is different in the broker, it will be different in the log. Simulate the log check finding an inconsistent ID.
respData2.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(
Errors.INCONSISTENT_TOPIC_ID.code()));
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
// We should have the inconsistent topic ID error on the partition
assertEquals(Errors.INCONSISTENT_TOPIC_ID.code(), resp2.responseData(topicNames).get(foo).errorCode());
}
@Test
public void testGetErroneousAndValidTopicIdPartitions() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build();
Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
TopicIdPartition tpNull1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 0));
TopicIdPartition tpNull2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 1));
String groupId = "grp";
// Create a new share session with an initial share fetch request
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<>();
reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
reqData2.put(tpNull1, new ShareFetchRequest.SharePartitionData(tpNull1.topicId(), 100));
ShareFetchMetadata reqMetadata2 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2);
assertEquals(ShareSessionContext.class, context2.getClass());
assertFalse(((ShareSessionContext) context2).isSubsequent());
assertErroneousAndValidTopicIdPartitions(context2.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(tpNull1), Arrays.asList(tp0, tp1));
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
respData2.put(tpNull1, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
// Check for throttled response
ShareFetchResponse resp2Throttle = context2.throttleResponse(100);
assertEquals(Errors.NONE, resp2Throttle.error());
assertEquals(100, resp2Throttle.throttleTimeMs());
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test trying to create a new session with a non-existent session key
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(Uuid.randomUuid(), 1)));
// Continue the first share session we created.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 1));
assertEquals(ShareSessionContext.class, context5.getClass());
assertTrue(((ShareSessionContext) context5).isSubsequent());
assertErroneousAndValidTopicIdPartitions(context5.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(tpNull1), Arrays.asList(tp0, tp1));
ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp5.error());
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test generating a throttled response for a subsequent share session
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData7 = Collections.singletonMap(
tpNull2, new ShareFetchRequest.SharePartitionData(tpNull2.topicId(), 100));
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, reqData7, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 2));
// Check for throttled response
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
assertEquals(100, resp7.throttleTimeMs());
assertErroneousAndValidTopicIdPartitions(context7.getErroneousAndValidTopicIdPartitions(), Arrays.asList(tpNull1, tpNull2), Arrays.asList(tp0, tp1));
// Close the subsequent share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata2.memberId(), ShareFetchMetadata.FINAL_EPOCH));
assertEquals(FinalContext.class, context8.getClass());
assertEquals(0, cache.size());
assertErroneousAndValidTopicIdPartitions(context8.getErroneousAndValidTopicIdPartitions(), Collections.emptyList(), Collections.emptyList());
// Check for throttled response
ShareFetchResponse resp8 = context8.throttleResponse(100);
assertEquals(Errors.NONE, resp8.error());
assertEquals(100, resp8.throttleTimeMs());
}
@Test
public void testShareFetchContextResponseSize() {
Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build();
Map<Uuid, String> topicNames = new HashMap<>();
Uuid tpId0 = Uuid.randomUuid();
Uuid tpId1 = Uuid.randomUuid();
topicNames.put(tpId0, "foo");
topicNames.put(tpId1, "bar");
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
String groupId = "grp";
// Create a new share session with an initial share fetch request
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = new LinkedHashMap<>();
reqData2.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
reqData2.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
// For response size expected value calculation
ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
short version = ApiKeys.SHARE_FETCH.latestVersion();
ShareFetchMetadata reqMetadata2 = new ShareFetchMetadata(Uuid.randomUuid(), ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2);
assertEquals(ShareSessionContext.class, context2.getClass());
assertFalse(((ShareSessionContext) context2).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
int respSize2 = context2.responseSize(respData2, version);
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
assertEquals(respData2, resp2.responseData(topicNames));
// We add 4 here in response to 4 being added in sizeOf() method in ShareFetchResponse class.
assertEquals(4 + resp2.data().size(objectSerializationCache, version), respSize2);
ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId,
reqMetadata2.memberId());
// Test trying to create a new session with an invalid epoch
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test trying to create a new session with a non-existent session key
Uuid memberId4 = Uuid.randomUuid();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(memberId4, 1)));
// Continue the first share session we created.
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData5 = Collections.singletonMap(
tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, reqData5, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 1));
assertEquals(ShareSessionContext.class, context5.getClass());
assertTrue(((ShareSessionContext) context5).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData5 = new LinkedHashMap<>();
respData5.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
int respSize5 = context5.responseSize(respData5, version);
ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData5);
assertEquals(Errors.NONE, resp5.error());
// We add 4 here in response to 4 being added in sizeOf() method in ShareFetchResponse class.
assertEquals(4 + resp5.data().size(objectSerializationCache, version), respSize5);
// Test setting an invalid share session epoch.
assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 5)));
// Test generating a throttled response for a subsequent share session
ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey2.memberId(), 2));
int respSize7 = context7.responseSize(respData2, version);
ShareFetchResponse resp7 = context7.throttleResponse(100);
assertEquals(Errors.NONE, resp7.error());
assertEquals(100, resp7.throttleTimeMs());
// We add 4 here in response to 4 being added in sizeOf() method in ShareFetchResponse class.
assertEquals(4 + new ShareFetchResponseData().size(objectSerializationCache, version), respSize7);
// Close the subsequent share session.
ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata2.memberId(), ShareFetchMetadata.FINAL_EPOCH));
assertEquals(FinalContext.class, context8.getClass());
assertEquals(0, cache.size());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<>();
respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
int respSize8 = context8.responseSize(respData8, version);
ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
assertEquals(Errors.NONE, resp8.error());
// We add 4 here in response to 4 being added in sizeOf() method in ShareFetchResponse class.
assertEquals(4 + resp8.data().size(objectSerializationCache, version), respSize8);
}
@Test
public void testCachedTopicPartitionsForInvalidShareSession() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).build();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.cachedTopicIdPartitionsInShareSession("grp", Uuid.randomUuid()));
}
@Test
public void testCachedTopicPartitionsForValidShareSessions() {
ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).build();
Uuid tpId0 = Uuid.randomUuid();
Uuid tpId1 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
Uuid memberId2 = Uuid.randomUuid();
// Create a new share session with an initial share fetch request.
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>();
reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), 100));
reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), 100));
ShareFetchMetadata reqMetadata1 = new ShareFetchMetadata(memberId1, ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1);
assertEquals(ShareSessionContext.class, context1.getClass());
assertFalse(((ShareSessionContext) context1).isSubsequent());
ShareSessionKey shareSessionKey1 = new ShareSessionKey(groupId,
reqMetadata1.memberId());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<>();
respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
assertEquals(Errors.NONE, resp1.error());
assertEquals(new HashSet<>(Arrays.asList(tp0, tp1)),
new HashSet<>(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
// Create a new share session with an initial share fetch request.
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData2 = Collections.singletonMap(
tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
ShareFetchMetadata reqMetadata2 = new ShareFetchMetadata(memberId2, ShareFetchMetadata.INITIAL_EPOCH);
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2);
assertEquals(ShareSessionContext.class, context2.getClass());
assertFalse(((ShareSessionContext) context2).isSubsequent());
ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId,
reqMetadata2.memberId());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<>();
respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
assertEquals(Errors.NONE, resp2.error());
assertEquals(Collections.singletonList(tp2), sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
// Continue the first share session we created.
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData3 = Collections.singletonMap(
tp2, new ShareFetchRequest.SharePartitionData(tp2.topicId(), 100));
ShareFetchContext context3 = sharePartitionManager.newContext(groupId, reqData3, EMPTY_PART_LIST,
new ShareFetchMetadata(shareSessionKey1.memberId(), 1));
assertEquals(ShareSessionContext.class, context3.getClass());
assertTrue(((ShareSessionContext) context3).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<>();
respData3.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
ShareFetchResponse resp3 = context3.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData3);
assertEquals(Errors.NONE, resp3.error());
assertEquals(new HashSet<>(Arrays.asList(tp0, tp1, tp2)),
new HashSet<>(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
// Continue the second session we created.
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData4 = Collections.singletonMap(
tp3, new ShareFetchRequest.SharePartitionData(tp3.topicId(), 100));
ShareFetchContext context4 = sharePartitionManager.newContext(groupId, reqData4, Collections.singletonList(tp2),
new ShareFetchMetadata(shareSessionKey2.memberId(), 1));
assertEquals(ShareSessionContext.class, context4.getClass());
assertTrue(((ShareSessionContext) context4).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData4 = new LinkedHashMap<>();
respData4.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
ShareFetchResponse resp4 = context4.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData4);
assertEquals(Errors.NONE, resp4.error());
assertEquals(Collections.singletonList(tp3), sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
// Close the first share session.
ShareFetchContext context5 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST,
new ShareFetchMetadata(reqMetadata1.memberId(), ShareFetchMetadata.FINAL_EPOCH));
assertEquals(FinalContext.class, context5.getClass());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData5 = new LinkedHashMap<>();
ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData5);
assertEquals(Errors.NONE, resp5.error());
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1));
// Continue the second share session .
ShareFetchContext context6 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.singletonList(tp3),
new ShareFetchMetadata(shareSessionKey2.memberId(), 2));
assertEquals(ShareSessionContext.class, context6.getClass());
assertTrue(((ShareSessionContext) context6).isSubsequent());
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData6 = new LinkedHashMap<>();
ShareFetchResponse resp6 = context6.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData6);
assertEquals(Errors.NONE, resp6.error());
assertEquals(Collections.emptyList(), sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
}
@Test
public void testSharePartitionKey() {
SharePartitionManager.SharePartitionKey sharePartitionKey1 = new SharePartitionManager.SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey2 = new SharePartitionManager.SharePartitionKey("mock-group-2",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey3 = new SharePartitionManager.SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey4 = new SharePartitionManager.SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 1)));
SharePartitionManager.SharePartitionKey sharePartitionKey5 = new SharePartitionManager.SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
SharePartitionManager.SharePartitionKey sharePartitionKey1Copy = new SharePartitionManager.SharePartitionKey("mock-group-1",
new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
assertEquals(sharePartitionKey1, sharePartitionKey1Copy);
assertNotEquals(sharePartitionKey1, sharePartitionKey2);
assertNotEquals(sharePartitionKey1, sharePartitionKey3);
assertNotEquals(sharePartitionKey1, sharePartitionKey4);
assertNotEquals(sharePartitionKey1, sharePartitionKey5);
assertNotEquals(sharePartitionKey1, null);
}
private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
}
private ShareFetchResponseData.PartitionData errorShareFetchResponse(Short errorCode) {
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setErrorCode(errorCode);
}
private void mockUpdateAndGenerateResponseData(ShareFetchContext context, String groupId, Uuid memberId) {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> data = new LinkedHashMap<>();
if (context.getClass() == ShareSessionContext.class) {
ShareSessionContext shareSessionContext = (ShareSessionContext) context;
if (!shareSessionContext.isSubsequent()) {
shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> data.put(topicIdPartition,
topicIdPartition.topic() == null ? errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) :
noErrorShareFetchResponse()));
} else {
synchronized (shareSessionContext.session()) {
shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
data.put(topicIdPartition, topicIdPartition.topic() == null ? errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) :
noErrorShareFetchResponse());
});
}
}
}
context.updateAndGenerateResponseData(groupId, memberId, data);
}
private void assertPartitionsPresent(ShareSessionContext context, List<TopicIdPartition> partitions) {
Set<TopicIdPartition> partitionsInContext = new HashSet<>();
if (!context.isSubsequent()) {
context.shareFetchData().forEach((topicIdPartition, sharePartitionData) ->
partitionsInContext.add(topicIdPartition));
} else {
context.session().partitionMap().forEach(cachedSharePartition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new
TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
partitionsInContext.add(topicIdPartition);
});
}
Set<TopicIdPartition> partitionsSet = new HashSet<>(partitions);
assertEquals(partitionsSet, partitionsInContext);
}
private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData,
List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
Set<TopicIdPartition> expectedErroneousSet = new HashSet<>(expectedErroneous);
Set<TopicIdPartition> expectedValidSet = new HashSet<>(expectedValid);
Set<TopicIdPartition> actualErroneousPartitions = new HashSet<>();
Set<TopicIdPartition> actualValidPartitions = new HashSet<>();
erroneousAndValidPartitionData.erroneous().forEach(topicIdPartitionPartitionDataTuple2 ->
actualErroneousPartitions.add(topicIdPartitionPartitionDataTuple2._1));
erroneousAndValidPartitionData.validTopicIdPartitions().forEach(topicIdPartitionPartitionDataTuple2 ->
actualValidPartitions.add(topicIdPartitionPartitionDataTuple2._1));
assertEquals(expectedErroneousSet, actualErroneousPartitions);
assertEquals(expectedValidSet, actualValidPartitions);
}
private static class SharePartitionManagerBuilder {
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager;
return this;
}
private SharePartitionManagerBuilder withTime(Time time) {
this.time = time;
return this;
}
private SharePartitionManagerBuilder withCache(ShareSessionCache cache) {
this.cache = cache;
return this;
}
private SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
this.partitionCacheMap = partitionCacheMap;
return this;
}
public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder();
}
public SharePartitionManager build() {
return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES);
}
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import java.util.Objects;
import java.util.Optional;
/**
* A cached share partition. The broker maintains a set of these objects for each share session.
* <p>
* We store many of these objects, so it is important for them to be memory-efficient.
* That is why we store topic and partition separately rather than storing a TopicPartition
* object. The TP object takes up more memory because it is a separate JVM object, and
* because it stores the cached hash code in memory.
*/
public class CachedSharePartition implements ImplicitLinkedHashCollection.Element {
private final String topic;
private final Uuid topicId;
private final int partition;
private final Optional<Integer> leaderEpoch;
private int maxBytes;
private boolean requiresUpdateInResponse;
private int cachedNext = ImplicitLinkedHashCollection.INVALID_INDEX;
private int cachedPrev = ImplicitLinkedHashCollection.INVALID_INDEX;
private CachedSharePartition(String topic, Uuid topicId, int partition, int maxBytes, Optional<Integer> leaderEpoch,
boolean requiresUpdateInResponse) {
this.topic = topic;
this.topicId = topicId;
this.partition = partition;
this.maxBytes = maxBytes;
this.leaderEpoch = leaderEpoch;
this.requiresUpdateInResponse = requiresUpdateInResponse;
}
public CachedSharePartition(String topic, Uuid topicId, int partition, boolean requiresUpdateInResponse) {
this(topic, topicId, partition, -1, Optional.empty(), requiresUpdateInResponse);
}
public CachedSharePartition(TopicIdPartition topicIdPartition) {
this(topicIdPartition.topic(), topicIdPartition.topicId(), topicIdPartition.partition(), false);
}
public CachedSharePartition(TopicIdPartition topicIdPartition, ShareFetchRequest.SharePartitionData reqData,
boolean requiresUpdateInResponse) {
this(topicIdPartition.topic(), topicIdPartition.topicId(), topicIdPartition.partition(), reqData.maxBytes,
Optional.empty(), requiresUpdateInResponse);
}
public Uuid topicId() {
return topicId;
}
public String topic() {
return topic;
}
public int partition() {
return partition;
}
public ShareFetchRequest.SharePartitionData reqData() {
return new ShareFetchRequest.SharePartitionData(topicId, maxBytes);
}
public void updateRequestParams(ShareFetchRequest.SharePartitionData reqData) {
// Update our cached request parameters.
maxBytes = reqData.maxBytes;
}
/**
* Determine whether the specified cached partition should be included in the ShareFetchResponse we send back to
* the fetcher and update it if requested.
* This function should be called while holding the appropriate session lock.
*
* @param respData partition data
* @param updateResponseData if set to true, update this CachedSharePartition with new request and response data.
* @return True if this partition should be included in the response; false if it can be omitted.
*/
public boolean maybeUpdateResponseData(ShareFetchResponseData.PartitionData respData, boolean updateResponseData) {
boolean mustRespond = false;
// Check the response data
// Partitions with new data are always included in the response.
if (ShareFetchResponse.recordsSize(respData) > 0)
mustRespond = true;
if (requiresUpdateInResponse) {
mustRespond = true;
if (updateResponseData)
requiresUpdateInResponse = false;
}
if (respData.errorCode() != Errors.NONE.code()) {
// Partitions with errors are always included in the response.
// We also set the cached requiresUpdateInResponse to false.
// This ensures that when the error goes away, we re-send the partition.
if (updateResponseData)
requiresUpdateInResponse = true;
mustRespond = true;
}
return mustRespond;
}
public String toString() {
return "CachedSharePartition(topic=" + topic +
", topicId=" + topicId +
", partition=" + partition +
", maxBytes=" + maxBytes +
", leaderEpoch=" + leaderEpoch +
")";
}
@Override
public int hashCode() {
return Objects.hash(partition, topicId);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
else if (obj == null || getClass() != obj.getClass())
return false;
else {
CachedSharePartition that = (CachedSharePartition) obj;
return partition == that.partition && Objects.equals(topicId, that.topicId);
}
}
@Override
public int prev() {
return cachedPrev;
}
@Override
public void setPrev(int prev) {
cachedPrev = prev;
}
@Override
public int next() {
return cachedNext;
}
@Override
public void setNext(int next) {
cachedNext = next;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import java.util.Objects;
public class LastUsedKey implements Comparable<LastUsedKey> {
private final ShareSessionKey key;
private final long lastUsedMs;
public LastUsedKey(ShareSessionKey key, long lastUsedMs) {
this.key = key;
this.lastUsedMs = lastUsedMs;
}
public ShareSessionKey key() {
return key;
}
public long lastUsedMs() {
return lastUsedMs;
}
@Override
public int hashCode() {
return Objects.hash(key, lastUsedMs);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
LastUsedKey other = (LastUsedKey) obj;
return lastUsedMs == other.lastUsedMs && Objects.equals(key, other.key);
}
@Override
public int compareTo(LastUsedKey other) {
int res = Long.compare(lastUsedMs, other.lastUsedMs);
if (res != 0)
return res;
return Integer.compare(key.hashCode(), other.key.hashCode());
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ShareSession {
// Helper enum to return the possible type of modified list of TopicIdPartitions in cache
public enum ModifiedTopicIdPartitionType {
ADDED,
UPDATED,
REMOVED
}
private final ShareSessionKey key;
private final ImplicitLinkedHashCollection<CachedSharePartition> partitionMap;
private final long creationMs;
private long lastUsedMs;
// visible for testing
public int epoch;
// This is used by the ShareSessionCache to store the last known size of this session.
// If this is -1, the Session is not in the cache.
private int cachedSize = -1;
/**
* The share session.
* Each share session is protected by its own lock, which must be taken before mutable
* fields are read or modified. This includes modification of the share session partition map.
*
* @param key The share session key to identify the share session uniquely.
* @param partitionMap The CachedPartitionMap.
* @param creationMs The time in milliseconds when this share session was created.
* @param lastUsedMs The last used time in milliseconds. This should only be updated by
* ShareSessionCache#touch.
* @param epoch The share session sequence number.
*/
public ShareSession(ShareSessionKey key, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
long creationMs, long lastUsedMs, int epoch) {
this.key = key;
this.partitionMap = partitionMap;
this.creationMs = creationMs;
this.lastUsedMs = lastUsedMs;
this.epoch = epoch;
}
public ShareSessionKey key() {
return key;
}
public synchronized int cachedSize() {
return cachedSize;
}
public synchronized void cachedSize(int size) {
cachedSize = size;
}
public synchronized long lastUsedMs() {
return lastUsedMs;
}
public synchronized void lastUsedMs(long ts) {
lastUsedMs = ts;
}
public synchronized ImplicitLinkedHashCollection<CachedSharePartition> partitionMap() {
return partitionMap;
}
// Visible for testing
public synchronized int epoch() {
return epoch;
}
public synchronized int size() {
return partitionMap.size();
}
public synchronized Boolean isEmpty() {
return partitionMap.isEmpty();
}
public synchronized LastUsedKey lastUsedKey() {
return new LastUsedKey(key, lastUsedMs);
}
// Visible for testing
public synchronized long creationMs() {
return creationMs;
}
// Update the cached partition data based on the request.
public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(Map<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget) {
List<TopicIdPartition> added = new ArrayList<>();
List<TopicIdPartition> updated = new ArrayList<>();
List<TopicIdPartition> removed = new ArrayList<>();
shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
CachedSharePartition cachedSharePartitionKey = new CachedSharePartition(topicIdPartition, sharePartitionData, true);
CachedSharePartition cachedPart = partitionMap.find(cachedSharePartitionKey);
if (cachedPart == null) {
partitionMap.mustAdd(cachedSharePartitionKey);
added.add(topicIdPartition);
} else {
cachedPart.updateRequestParams(sharePartitionData);
updated.add(topicIdPartition);
}
});
toForget.forEach(topicIdPartition -> {
if (partitionMap.remove(new CachedSharePartition(topicIdPartition)))
removed.add(topicIdPartition);
});
Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> result = new HashMap<>();
result.put(ModifiedTopicIdPartitionType.ADDED, added);
result.put(ModifiedTopicIdPartitionType.UPDATED, updated);
result.put(ModifiedTopicIdPartitionType.REMOVED, removed);
return result;
}
public String toString() {
return "ShareSession(" +
"key=" + key +
", partitionMap=" + partitionMap +
", creationMs=" + creationMs +
", lastUsedMs=" + lastUsedMs +
", epoch=" + epoch +
", cachedSize=" + cachedSize +
")";
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
/**
* Caches share sessions.
* <p>
* See tryEvict for an explanation of the cache eviction strategy.
* <p>
* The ShareSessionCache is thread-safe because all of its methods are synchronized.
* Note that individual share sessions have their own locks which are separate from the
* ShareSessionCache lock. In order to avoid deadlock, the ShareSessionCache lock
* must never be acquired while an individual ShareSession lock is already held.
*/
public class ShareSessionCache {
private final int maxEntries;
private final long evictionMs;
private long numPartitions = 0;
// A map of session key to ShareSession.
private Map<ShareSessionKey, ShareSession> sessions = new HashMap<>();
// Maps last used times to sessions.
private TreeMap<LastUsedKey, ShareSession> lastUsed = new TreeMap<>();
// Visible for testing
public synchronized TreeMap<LastUsedKey, ShareSession> lastUsed() {
return lastUsed;
}
public ShareSessionCache(int maxEntries, long evictionMs) {
this.maxEntries = maxEntries;
this.evictionMs = evictionMs;
}
/**
* Get a session by session key.
*
* @param key The share session key.
* @return The session, or None if no such session was found.
*/
public synchronized ShareSession get(ShareSessionKey key) {
return sessions.getOrDefault(key, null);
}
/**
* Get the number of entries currently in the share session cache.
*/
public synchronized int size() {
return sessions.size();
}
public synchronized long totalPartitions() {
return numPartitions;
}
public synchronized ShareSession remove(ShareSessionKey key) {
ShareSession session = get(key);
if (session != null)
return remove(session);
return null;
}
/**
* Remove an entry from the session cache.
*
* @param session The session.
* @return The removed session, or None if there was no such session.
*/
public synchronized ShareSession remove(ShareSession session) {
synchronized (session) {
lastUsed.remove(session.lastUsedKey());
}
ShareSession removeResult = sessions.remove(session.key());
if (removeResult != null) {
numPartitions = numPartitions - session.cachedSize();
}
return removeResult;
}
/**
* Update a session's position in the lastUsed tree.
*
* @param session The session.
* @param now The current time in milliseconds.
*/
public synchronized void touch(ShareSession session, long now) {
synchronized (session) {
// Update the lastUsed map.
lastUsed.remove(session.lastUsedKey());
session.lastUsedMs(now);
lastUsed.put(session.lastUsedKey(), session);
int oldSize = session.cachedSize();
if (oldSize != -1) {
numPartitions = numPartitions - oldSize;
}
session.cachedSize(session.size());
numPartitions = numPartitions + session.cachedSize();
}
}
/**
* Try to evict an entry from the session cache.
* <p>
* A proposed new element A may evict an existing element B if:
* B is considered "stale" because it has been inactive for a long time.
*
* @param now The current time in milliseconds.
* @return True if an entry was evicted; false otherwise.
*/
public synchronized boolean tryEvict(long now) {
// Try to evict an entry which is stale.
Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = lastUsed.firstEntry();
if (lastUsedEntry == null) {
return false;
} else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
ShareSession session = lastUsedEntry.getValue();
remove(session);
return true;
}
return false;
}
/**
* Maybe create a new session and add it to the cache.
* @param groupId - The group id in the share fetch request.
* @param memberId - The member id in the share fetch request.
* @param now - The current time in milliseconds.
* @param partitionMap - The topic partitions to be added to the session.
* @return - The session key if the session was created, or null if the session was not created.
*/
public synchronized ShareSessionKey maybeCreateSession(String groupId, Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) {
if (sessions.size() < maxEntries || tryEvict(now)) {
ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap,
now, now, ShareFetchMetadata.nextEpoch(ShareFetchMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
touch(session, now);
return session.key();
}
return null;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
public class ShareSessionKey {
private final String groupId;
private final Uuid memberId;
public ShareSessionKey(String groupId, Uuid memberId) {
this.groupId = Objects.requireNonNull(groupId);
this.memberId = Objects.requireNonNull(memberId);
}
public Uuid memberId() {
return memberId;
}
@Override
public int hashCode() {
return Objects.hash(groupId, memberId);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
else if (obj == null || getClass() != obj.getClass())
return false;
else {
ShareSessionKey that = (ShareSessionKey) obj;
return groupId.equals(that.groupId) && Objects.equals(memberId, that.memberId);
}
}
public String toString() {
return "ShareSessionKey(" +
" groupId=" + groupId +
", memberId=" + memberId +
")";
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class CachedSharePartitionTest {
@Test
public void testCachedSharePartitionEqualsAndHashCode() {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic";
int partition = 0;
CachedSharePartition cachedSharePartitionWithIdAndName = new
CachedSharePartition(topicName, topicId, partition, false);
CachedSharePartition cachedSharePartitionWithIdAndNoName = new
CachedSharePartition(null, topicId, partition, false);
CachedSharePartition cachedSharePartitionWithDifferentIdAndName = new
CachedSharePartition(topicName, Uuid.randomUuid(), partition, false);
CachedSharePartition cachedSharePartitionWithZeroIdAndName = new
CachedSharePartition(topicName, Uuid.ZERO_UUID, partition, false);
// CachedSharePartitions with valid topic IDs will compare topic ID and partition but not topic name.
assertEquals(cachedSharePartitionWithIdAndName, cachedSharePartitionWithIdAndNoName);
assertEquals(cachedSharePartitionWithIdAndName.hashCode(), cachedSharePartitionWithIdAndNoName.hashCode());
assertNotEquals(cachedSharePartitionWithIdAndName, cachedSharePartitionWithDifferentIdAndName);
assertNotEquals(cachedSharePartitionWithIdAndName.hashCode(), cachedSharePartitionWithDifferentIdAndName.hashCode());
assertNotEquals(cachedSharePartitionWithIdAndName, cachedSharePartitionWithZeroIdAndName);
assertNotEquals(cachedSharePartitionWithIdAndName.hashCode(), cachedSharePartitionWithZeroIdAndName.hashCode());
// CachedSharePartitions with null name and valid IDs will act just like ones with valid names
assertNotEquals(cachedSharePartitionWithIdAndNoName, cachedSharePartitionWithDifferentIdAndName);
assertNotEquals(cachedSharePartitionWithIdAndNoName.hashCode(), cachedSharePartitionWithDifferentIdAndName.hashCode());
assertNotEquals(cachedSharePartitionWithIdAndNoName, cachedSharePartitionWithZeroIdAndName);
assertNotEquals(cachedSharePartitionWithIdAndNoName.hashCode(), cachedSharePartitionWithZeroIdAndName.hashCode());
assertEquals(cachedSharePartitionWithZeroIdAndName.hashCode(), cachedSharePartitionWithZeroIdAndName.hashCode());
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class ShareSessionCacheTest {
@Test
public void testShareSessionCache() {
ShareSessionCache cache = new ShareSessionCache(3, 100);
assertEquals(0, cache.size());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 10, mockedSharePartitionMap(20));
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40)));
assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5)));
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2, key3)));
cache.touch(cache.get(key1), 200);
ShareSessionKey key4 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 210, mockedSharePartitionMap(11));
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key3, key4)));
cache.touch(cache.get(key1), 400);
cache.touch(cache.get(key3), 390);
cache.touch(cache.get(key4), 400);
ShareSessionKey key5 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 410, mockedSharePartitionMap(50));
assertNull(key5);
}
@Test
public void testResizeCachedSessions() {
ShareSessionCache cache = new ShareSessionCache(2, 100);
assertEquals(0, cache.size());
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
assertNotNull(key1);
assertShareCacheContains(cache, new ArrayList<>(Collections.singletonList(key1)));
ShareSession session1 = cache.get(key1);
assertEquals(2, session1.size());
assertEquals(2, cache.totalPartitions());
assertEquals(1, cache.size());
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(4));
assertNotNull(key2);
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2)));
ShareSession session2 = cache.get(key2);
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.touch(session1, 200);
cache.touch(session2, 200);
ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 200, mockedSharePartitionMap(5));
assertNull(key3);
assertShareCacheContains(cache, new ArrayList<>(Arrays.asList(key1, key2)));
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.remove(key1);
assertShareCacheContains(cache, new ArrayList<>(Collections.singletonList(key2)));
assertEquals(1, cache.size());
assertEquals(4, cache.totalPartitions());
Iterator<CachedSharePartition> iterator = session2.partitionMap().iterator();
iterator.next();
iterator.remove();
assertEquals(3, session2.size());
assertEquals(4, session2.cachedSize());
cache.touch(session2, session2.lastUsedMs());
assertEquals(3, cache.totalPartitions());
}
private ImplicitLinkedHashCollection<CachedSharePartition> mockedSharePartitionMap(int size) {
ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
ImplicitLinkedHashCollection<>(size);
for (int i = 0; i < size; i++)
cacheMap.add(new CachedSharePartition("test", Uuid.randomUuid(), i, false));
return cacheMap;
}
private void assertShareCacheContains(ShareSessionCache cache,
ArrayList<ShareSessionKey> sessionKeys) {
int i = 0;
assertEquals(sessionKeys.size(), cache.size());
for (ShareSessionKey sessionKey : sessionKeys) {
assertFalse(cache.get(sessionKey).isEmpty(),
"Missing session " + ++i + " out of " + sessionKeys.size() + " ( " + sessionKey + " )");
}
}
}