From 3a1465e14c19560cbfaf6829209eb42cd5a45f70 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Fri, 27 Sep 2024 00:20:55 +0800 Subject: [PATCH] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) (#16982) Reviewers: Lianet Magrans , TaiJuWu , Kirk True , TengYao Chi --- .../clients/consumer/KafkaConsumerTest.java | 86 +++++++++++++------ 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 77c0264849a..8d3e955e799 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1068,12 +1068,20 @@ public class KafkaConsumerTest { assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0))); } - // TODO: this test triggers a bug with the CONSUMER group protocol implementation. - // The bug will be investigated and fixed so this test can use both group protocols. @ParameterizedTest - @EnumSource(value = GroupProtocol.class, names = "CLASSIC") - public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) { - assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).poll(Duration.ZERO)); + @EnumSource(GroupProtocol.class) + public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) throws InterruptedException { + setupThrowableConsumer(groupProtocol); + TestUtils.waitForCondition(() -> { + try { + // For CONSUMER protocol, the offset request is sent in the background thread, + // so we need to wait until the offset request is sent. + consumer.poll(Duration.ZERO); + return false; + } catch (UnsupportedVersionException e) { + return true; + } + }, "Failed to throw UnsupportedVersionException in poll"); } @ParameterizedTest @@ -2466,12 +2474,10 @@ public class KafkaConsumerTest { assertThrows(IllegalStateException.class, consumer::groupMetadata); } - // TODO: this test triggers a bug with the CONSUMER group protocol implementation. - // The bug will be investigated and fixed so this test can use both group protocols. @ParameterizedTest - @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + @EnumSource(GroupProtocol.class) @SuppressWarnings("unchecked") - public void testCurrentLag(GroupProtocol groupProtocol) { + public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException { final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); @@ -2486,33 +2492,49 @@ public class KafkaConsumerTest { // poll once to update with the current metadata consumer.poll(Duration.ofMillis(0)); + TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), + "No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); // no error for no current position assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); - assertEquals(0, client.inFlightRequestCount()); - + if (groupProtocol == GroupProtocol.CLASSIC) { + // Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), + // different from the new async consumer, that will send the LIST_OFFSETS request in the background thread + // on the next background thread poll. + assertEquals(0, client.inFlightRequestCount()); + } // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch - assertEquals(2, client.inFlightRequestCount()); + TestUtils.waitForCondition(() -> { + boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); + boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); + return hasListOffsetRequest && hasFetchRequest; + }, "No list-offset & fetch request sent"); // no error for no end offset (so unknown lag) assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); // poll once again, which should return the list-offset response // and hence next call would return correct lag result - client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); + ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); + client.respondToRequest(listOffsetRequest, listOffsetsResponse(singletonMap(tp0, 90L))); consumer.poll(Duration.ofMillis(0)); - assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); + // For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated. + TestUtils.waitForCondition(() -> { + OptionalLong result = consumer.currentLag(tp0); + return result.isPresent() && result.getAsLong() == 40L; + }, "Subscription state is not updated"); // requests: fetch - assertEquals(1, client.inFlightRequestCount()); + TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FETCH), "No fetch request sent"); // one successful fetch should update the log end offset and the position + ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); - client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); + client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); @@ -2522,32 +2544,40 @@ public class KafkaConsumerTest { assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); } - // TODO: this test triggers a bug with the CONSUMER group protocol implementation. - // The bug will be investigated and fixed so this test can use both group protocols. @ParameterizedTest - @EnumSource(value = GroupProtocol.class, names = "CLASSIC") - public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) { + @EnumSource(GroupProtocol.class) + public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) throws InterruptedException { final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, singletonMap(topic, 1)); - consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); - + consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, + null, groupInstanceId, false); consumer.assign(singleton(tp0)); - - // poll once to update with the current metadata - consumer.poll(Duration.ofMillis(0)); - client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); - consumer.seek(tp0, 50L); - client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L))); + // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in background thread. + // Wait for the first fetch request to avoid ListOffsetResponse mismatch. + TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH), + "No fetch request sent"); + + client.prepareResponse(request -> request instanceof ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L))); assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0))); // correct lag result should be returned as well assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0)); } + private ClientRequest findRequest(MockClient client, ApiKeys apiKey) { + Optional request = client.requests().stream().filter(r -> r.requestBuilder().apiKey().equals(apiKey)).findFirst(); + assertTrue(request.isPresent(), "No " + apiKey + " request was submitted to the client"); + return request.get(); + } + + private boolean requestGenerated(MockClient client, ApiKeys apiKey) { + return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey)); + } + private KafkaConsumer consumerWithPendingAuthenticationError(GroupProtocol groupProtocol, final Time time) { ConsumerMetadata metadata = createMetadata(subscription);