KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) (#16982)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, TaiJuWu <tjwu1217@gmail.com>, Kirk True <ktrue@confluent.io>, TengYao Chi <kitingiao@gmail.com>
This commit is contained in:
PoAn Yang 2024-09-27 00:20:55 +08:00 committed by GitHub
parent cd4d6ce9d5
commit 3a1465e14c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 58 additions and 28 deletions

View File

@ -1068,12 +1068,20 @@ public class KafkaConsumerTest {
assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0))); assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
} }
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
// The bug will be investigated and fixed so this test can use both group protocols.
@ParameterizedTest @ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC") @EnumSource(GroupProtocol.class)
public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) { public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) throws InterruptedException {
assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).poll(Duration.ZERO)); setupThrowableConsumer(groupProtocol);
TestUtils.waitForCondition(() -> {
try {
// For CONSUMER protocol, the offset request is sent in the background thread,
// so we need to wait until the offset request is sent.
consumer.poll(Duration.ZERO);
return false;
} catch (UnsupportedVersionException e) {
return true;
}
}, "Failed to throw UnsupportedVersionException in poll");
} }
@ParameterizedTest @ParameterizedTest
@ -2466,12 +2474,10 @@ public class KafkaConsumerTest {
assertThrows(IllegalStateException.class, consumer::groupMetadata); assertThrows(IllegalStateException.class, consumer::groupMetadata);
} }
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
// The bug will be investigated and fixed so this test can use both group protocols.
@ParameterizedTest @ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC") @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testCurrentLag(GroupProtocol groupProtocol) { public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription); final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata); final MockClient client = new MockClient(time, metadata);
@ -2486,33 +2492,49 @@ public class KafkaConsumerTest {
// poll once to update with the current metadata // poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0)); consumer.poll(Duration.ofMillis(0));
TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent");
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
// no error for no current position // no error for no current position
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
if (groupProtocol == GroupProtocol.CLASSIC) {
// Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll),
// different from the new async consumer, that will send the LIST_OFFSETS request in the background thread
// on the next background thread poll.
assertEquals(0, client.inFlightRequestCount()); assertEquals(0, client.inFlightRequestCount());
}
// poll once again, which should send the list-offset request // poll once again, which should send the list-offset request
consumer.seek(tp0, 50L); consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0)); consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch // requests: list-offset, fetch
assertEquals(2, client.inFlightRequestCount()); TestUtils.waitForCondition(() -> {
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
return hasListOffsetRequest && hasFetchRequest;
}, "No list-offset & fetch request sent");
// no error for no end offset (so unknown lag) // no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
// poll once again, which should return the list-offset response // poll once again, which should return the list-offset response
// and hence next call would return correct lag result // and hence next call would return correct lag result
client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
consumer.poll(Duration.ofMillis(0)); consumer.poll(Duration.ofMillis(0));
assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); // For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated.
TestUtils.waitForCondition(() -> {
OptionalLong result = consumer.currentLag(tp0);
return result.isPresent() && result.getAsLong() == 40L;
}, "Subscription state is not updated");
// requests: fetch // requests: fetch
assertEquals(1, client.inFlightRequestCount()); TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent");
// one successful fetch should update the log end offset and the position // one successful fetch should update the log end offset and the position
ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo)));
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1)); final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count()); assertEquals(5, records.count());
@ -2522,32 +2544,40 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
} }
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
// The bug will be investigated and fixed so this test can use both group protocols.
@ParameterizedTest @ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC") @EnumSource(GroupProtocol.class)
public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) { public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription); final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata); final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1)); initMetadata(client, singletonMap(topic, 1));
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false,
null, groupInstanceId, false);
consumer.assign(singleton(tp0)); consumer.assign(singleton(tp0));
// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
consumer.seek(tp0, 50L); consumer.seek(tp0, 50L);
client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L)));
// For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in background thread.
// Wait for the first fetch request to avoid ListOffsetResponse mismatch.
TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
"No fetch request sent");
client.prepareResponse(request -> request instanceof ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0))); assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0)));
// correct lag result should be returned as well // correct lag result should be returned as well
assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
} }
private ClientRequest findRequest(MockClient client, ApiKeys apiKey) {
Optional<ClientRequest> request = client.requests().stream().filter(r -> r.requestBuilder().apiKey().equals(apiKey)).findFirst();
assertTrue(request.isPresent(), "No " + apiKey + " request was submitted to the client");
return request.get();
}
private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}
private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol, private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol,
final Time time) { final Time time) {
ConsumerMetadata metadata = createMetadata(subscription); ConsumerMetadata metadata = createMetadata(subscription);