MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. (#19295)

Currently if we received just a control record in the
`ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would
not be updated as the record is ignored. But in the process, we lose the
acknowledgment for this control record which is a GAP.
PR fixes this by adding an additional map for control record
acknowledgements in `ShareFetchEvent`.
This updates both the ShareConsumerImpl and ShareConsumeRequestManager
to accommodate the additional map.
Added a unit test in `ShareConsumerImplTest` and
`ShareConsumeRequestManagerTest` to verify the changes.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-04-01 09:15:03 -04:00 committed by GitHub
parent 0c97338959
commit e301508b53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 165 additions and 241 deletions

View File

@ -256,13 +256,19 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
return new PollResult(requests);
}
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
if (!fetchMoreRecords) {
log.debug("Fetch more data");
fetchMoreRecords = true;
}
// The acknowledgements sent via ShareFetch are stored in this map.
// Process both acknowledgement maps and sends them in the next ShareFetch.
processAcknowledgementsMap(acknowledgementsMap);
processAcknowledgementsMap(controlRecordAcknowledgements);
}
private void processAcknowledgementsMap(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
acknowledgementsMap.forEach((tip, nodeAcks) -> {
int nodeId = nodeAcks.nodeId();
Map<TopicIdPartition, Acknowledgements> currentNodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(nodeId);
@ -1474,4 +1480,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
return super.toString().toLowerCase(Locale.ROOT);
}
}
Map<TopicIdPartition, Acknowledgements> getFetchAcknowledgementsToSend(Integer nodeId) {
return fetchAcknowledgementsToSend.get(nodeId);
}
}

View File

@ -649,8 +649,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
// Fetch more records and send any waiting acknowledgements
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap));
// Check for any acknowledgements which could have come from control records (GAP) and include them.
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();

View File

@ -482,7 +482,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* Process event that tells the share consume request manager to fetch more records.
*/
private void process(final ShareFetchEvent event) {
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap()));
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
}
/**

View File

@ -25,15 +25,23 @@ public class ShareFetchEvent extends ApplicationEvent {
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
private final Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements;
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
super(Type.SHARE_FETCH);
this.acknowledgementsMap = acknowledgementsMap;
this.controlRecordAcknowledgements = controlRecordAcknowledgements;
}
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
public Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements() {
return controlRecordAcknowledgements;
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", acknowledgementsMap=" + acknowledgementsMap;

View File

@ -208,14 +208,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
@ -229,15 +222,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
@ -252,16 +237,9 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertTrue(partitionRecords.containsKey(tp0));
@ -272,15 +250,9 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records,
ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE);
assertEquals(1.0,
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
@ -291,16 +263,11 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(2L, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
// Preparing a response with an acknowledgement error.
client.prepareResponse(fullFetchResponse(tip0, records,
Collections.emptyList(), Errors.NONE, Errors.INVALID_RECORD_STATE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, Collections.emptyList(), Errors.NONE, Errors.INVALID_RECORD_STATE);
assertEquals(2.0,
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
assertEquals(1.0,
@ -318,14 +285,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -348,14 +308,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -378,14 +331,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@ -437,20 +383,13 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
// Piggyback acknowledgements
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -478,14 +417,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -513,14 +445,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -623,15 +548,8 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -662,15 +580,8 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -713,15 +624,8 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
@ -758,15 +662,8 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
@ -821,15 +718,8 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
@ -871,14 +761,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
@ -887,7 +770,7 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Piggyback acknowledgements
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -897,7 +780,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
@ -917,13 +800,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -959,13 +836,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -1001,13 +872,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
assignFromSubscribed(singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@ -1038,18 +903,12 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
fetchRecords();
// Subscription changes.
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
@ -1083,17 +942,12 @@ public class ShareConsumeRequestManagerTest {
assertEquals(nodeId0, tp0Leader);
assertEquals(nodeId1, tp1Leader);
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(0, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
fetchRecords();
// Subscription changes.
subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
@ -1152,14 +1006,8 @@ public class ShareConsumeRequestManagerTest {
assertEquals(nodeId0, tp0Leader);
assertEquals(nodeId1, tp1Leader);
// Send the first ShareFetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
// Prepare an empty response
client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
// Send the first ShareFetch with an empty response
sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NONE);
fetchRecords();
@ -1240,14 +1088,8 @@ public class ShareConsumeRequestManagerTest {
Node nodeId1 = metadata.fetch().nodeById(1);
LinkedList<Node> nodeList = new LinkedList<>(Arrays.asList(nodeId0, nodeId1));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
@ -1280,13 +1122,7 @@ public class ShareConsumeRequestManagerTest {
assignFromSubscribed(Collections.singleton(tp0));
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
@ -1464,12 +1300,7 @@ public class ShareConsumeRequestManagerTest {
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
tp -> validLeaderEpoch, topicIds, false));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@ -1500,7 +1331,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements1 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)), Collections.emptyMap());
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -1529,13 +1360,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
assignFromSubscribed(singleton(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NOT_LEADER_OR_FOLLOWER));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NOT_LEADER_OR_FOLLOWER);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchRecords();
assertFalse(partitionRecords.containsKey(tp0));
@ -1550,14 +1375,9 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
@ -1888,7 +1708,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(startingClusterMetadata, metadata.fetch());
@ -1993,7 +1813,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
// The metadata snapshot will have been updated with the new leader information
assertNotEquals(startingClusterMetadata, metadata.fetch());
@ -2088,7 +1908,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(startingClusterMetadata, metadata.fetch());
@ -2227,7 +2047,6 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, completedAcknowledgements.get(1).size());
assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1));
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
}
@Test
@ -2377,7 +2196,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)));
shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)), Collections.emptyMap());
// Move the leadership of tp0 onto node 1
HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<>();
@ -2460,13 +2279,13 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(startingClusterMetadata, metadata.fetch());
acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -2507,7 +2326,7 @@ public class ShareConsumeRequestManagerTest {
assertNotEquals(startingClusterMetadata, metadata.fetch());
shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgements)));
shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgements)), Collections.emptyMap());
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@ -2759,14 +2578,14 @@ public class ShareConsumeRequestManagerTest {
}
private int sendFetches() {
fetch(new HashMap<>());
fetch(new HashMap<>(), new HashMap<>());
NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult.unsentRequests.size();
}
private NetworkClientDelegate.PollResult sendFetchesReturnPollResult() {
fetch(new HashMap<>());
fetch(new HashMap<>(), new HashMap<>());
NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult;
@ -2905,4 +2724,47 @@ public class ShareConsumeRequestManagerTest {
}
}
}
@Test
void testFetchWithControlRecords() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = new HashMap<>();
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements));
Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsControlRecordMap = new HashMap<>();
Acknowledgements controlAcknowledgements = Acknowledgements.empty();
controlAcknowledgements.addGap(2L);
nodeAcknowledgementsControlRecordMap.put(tip0, new NodeAcknowledgements(0, controlAcknowledgements));
shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, nodeAcknowledgementsControlRecordMap);
Map<TopicIdPartition, Acknowledgements> fetchAcksToSend = shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
assertEquals(1, fetchAcksToSend.size());
assertEquals(AcknowledgeType.ACCEPT, fetchAcksToSend.get(tip0).get(1L));
assertEquals(2, fetchAcksToSend.get(tip0).size());
assertNull(fetchAcksToSend.get(tip0).get(3L));
}
private void sendFetchAndVerifyResponse(MemoryRecords records,
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
Errors... error) {
// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
if (error.length > 1) {
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, error[0], error[1]));
} else {
client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, error[0]));
}
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
}
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.common.KafkaException;
@ -53,6 +54,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -218,6 +220,48 @@ public class ShareConsumerImplTest {
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@Test
public void testControlRecordsOnEmptyFetch() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(subscriptions);
// Setup subscription
final String topicName = "foo";
final List<String> subscriptionTopic = Collections.singletonList(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, subscriptionTopic);
consumer.subscribe(subscriptionTopic);
// Create a fetch with only GAP (no records)
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 0, topicName);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, tip);
// Add GAP without adding any records
batch.addGap(1);
final ShareFetch<String, String> fetchWithOnlyGap = ShareFetch.empty();
fetchWithOnlyGap.add(tip, batch);
doReturn(fetchWithOnlyGap).when(fetchCollector).collect(any(ShareFetchBuffer.class));
consumer.poll(Duration.ZERO);
// Verify that next ShareFetchEvent was sent with the acknowledgement GAP for offset 1
verify(applicationEventHandler).add(argThat(event -> {
if (!(event instanceof ShareFetchEvent)) {
return false;
}
ShareFetchEvent fetchEvent = (ShareFetchEvent) event;
// Regular acknowledgements map should be empty
if (!fetchEvent.acknowledgementsMap().isEmpty()) {
return false;
}
// Control record acknowledgements map should contain the GAP for offset 1
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcks = fetchEvent.controlRecordAcknowledgements();
return controlRecordAcks.containsKey(tip) &&
controlRecordAcks.get(tip).acknowledgements().get(1L) == null; // Null indicates GAP
}));
}
@Test
public void testWakeupAfterEmptyFetch() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);