KAFKA-19485: Added check before sending acknowledgements on initial epoch. (#20135)
CI / build (push) Waiting to run Details

https://issues.apache.org/jira/browse/KAFKA-19485

**Bug :**
There is a bug in `ShareConsumeRequestManager` where we are adding
acknowledgements on initial `ShareSession` epoch even after checking for
it.
Added fix to only include acknowledgements in the request if we have to,

PR also adds the check at another point in the code where we could
potentially be sending such acknowledgements.  One of the cases could be
when metadata is refreshed with empty topic IDs after a broker restart.
This means leader information would not be available on the node.

- Consumer subscribed to a partition whose leader was node-0.
- Broker restart happens and node-0 is elected leader again. Broker
starts a new `ShareSession`.
- Background thread sends a fetch request with **non-zero** epoch.
- Broker responds with `SHARE_SESSION_NOT_FOUND`.
- Client updates session epoch to 0 once it receives this error.
- Client updates metadata but receives empty metadata response. (Leader
unavailable)
- Application thread processing the previous fetch, completes and sends
acks to piggyback on next fetch.
- Next fetch will send the piggyback acknowledgements on the fetch for
previously subscribed partitions resulting in error from broker
("`Acknowledge data present on initial epoch`"). (Currently we attempt
to send even if leader is unavailable).

**Fix** :  Add a check before sending out acknowledgments if we are on
an initial epoch.
Added unit test covering the above scenario.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-07-10 04:06:19 -04:00 committed by GitHub
parent ded7df9707
commit 56a3c6dde9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 78 additions and 10 deletions

View File

@ -175,22 +175,25 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
TopicIdPartition tip = new TopicIdPartition(topicId, partition);
Acknowledgements acknowledgementsToSend = null;
boolean canSendAcknowledgements = true;
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(node.id());
if (nodeAcksFromFetchMap != null) {
acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip);
if (acknowledgementsToSend != null) {
if (handler.isNewSession()) {
// Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest.
acknowledgementsToSend.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acknowledgementsToSend));
} else {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend);
// Check if the share session epoch is valid for sending acknowledgements.
if (!maybeAddAcknowledgements(handler, node, tip, acknowledgementsToSend)) {
canSendAcknowledgements = false;
}
}
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);
if (canSendAcknowledgements) {
handler.addPartitionToFetch(tip, acknowledgementsToSend);
} else {
handler.addPartitionToFetch(tip, null);
}
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for partition {} to node {}", tip, node.id());
@ -212,8 +215,10 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
metricsManager.recordAcknowledgementSent(acks.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks);
// Check if the share session epoch is valid for sending acknowledgements.
if (!maybeAddAcknowledgements(sessionHandler, node, tip, acks)) {
return;
}
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
handlerMap.put(node, sessionHandler);
@ -256,6 +261,28 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
return new PollResult(requests);
}
/**
*
* @return True if we can add acknowledgements to the share session.
* If we cannot add acknowledgements, they are completed with {@link Errors#INVALID_SHARE_SESSION_EPOCH} exception.
*/
private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
Node node,
TopicIdPartition tip,
Acknowledgements acknowledgements) {
if (handler.isNewSession()) {
// Failing the acknowledgements as we cannot have piggybacked acknowledgements in the initial ShareFetchRequest.
log.debug("Cannot send acknowledgements on initial epoch for ShareSession for partition {}", tip);
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acknowledgements));
return false;
} else {
metricsManager.recordAcknowledgementSent(acknowledgements.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgements);
return true;
}
}
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
if (!fetchMoreRecords) {

View File

@ -1407,8 +1407,49 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(1, pollResult.unsentRequests.size());
ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
assertEquals(1, builder.data().topics().size());
// We should not add the acknowledgements as part of the request.
assertEquals(0, builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
@Test
public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionChange() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
// Simulate a broker restart, but no leader change, this resets share session epoch to 0.
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.SHARE_SESSION_NOT_FOUND));
networkClientDelegate.poll(time.timer(0));
// Simulate a metadata update with no topics in the response.
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Collections.emptyMap(),
tp -> validLeaderEpoch, null, false));
// The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread.
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(0, completedAcknowledgements.size());
// Next fetch would not include any acknowledgements.
NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(0, pollResult.unsentRequests.size());
// We should fail any waiting acknowledgements for tip-0 as it would have a share session epoch equal to 0.
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}