Refactoring and clean up

This commit is contained in:
Kirk True 2025-09-29 12:58:07 -07:00
parent f45b70e688
commit 8235ed2256
5 changed files with 126 additions and 101 deletions

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
public class ConsumerPollTestUtils {
@SuppressWarnings("unchecked")
public static <T> ConsumerRecords<T, T> waitForRecords(Consumer<?, ?> consumer, Time time) {
Timer timer = time.timer(15000);
while (timer.notExpired()) {
ConsumerRecords<T, T> records = (ConsumerRecords<T, T>) consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty())
return records;
}
throw new org.apache.kafka.common.errors.TimeoutException("no records to return");
}
public static void waitForCondition(Consumer<?, ?> consumer,
Supplier<Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return testCondition.get();
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
public static void waitForException(Consumer<?, ?> consumer,
Function<KafkaException, Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (KafkaException e) {
return testCondition.apply(e);
}
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
}

View File

@ -102,7 +102,6 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockDeserializer;
@ -147,7 +146,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -952,7 +950,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 50L)));
client.prepareResponse(fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = pollForRecords();
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -1046,7 +1044,7 @@ public class KafkaConsumerTest {
}, fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = pollForRecords();
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertEquals(5, records.count());
assertEquals(Set.of(tp0), records.partitions());
assertEquals(1, records.nextOffsets().size());
@ -1765,7 +1763,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
ConsumerRecords<String, String> records = pollForRecords();
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
@ -1824,7 +1822,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
ConsumerRecords<String, String> records = pollForRecords();
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -2119,7 +2117,7 @@ public class KafkaConsumerTest {
time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
final ConsumerRecords<String, String> records = pollForRecords();
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertFalse(records.isEmpty());
assertFalse(records.nextOffsets().isEmpty());
}
@ -2667,8 +2665,11 @@ public class KafkaConsumerTest {
consumer.assign(Set.of(tp0));
// poll once to update with the current metadata
waitForConsumerPoll(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent");
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent"
);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
// no error for no current position
@ -2682,11 +2683,15 @@ public class KafkaConsumerTest {
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
// requests: list-offset, fetch
waitForConsumerPoll(() -> {
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
return hasListOffsetRequest && hasFetchRequest;
}, "No list-offset & fetch request sent");
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> {
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
return hasListOffsetRequest && hasFetchRequest;
},
"No list-offset & fetch request sent"
);
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
@ -2710,7 +2715,7 @@ public class KafkaConsumerTest {
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
client.respondToRequest(fetchRequest, fetchResponse(Map.of(tp0, fetchInfo)));
final ConsumerRecords<String, String> records = pollForRecords();
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer, time);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -3800,34 +3805,6 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
}
@SuppressWarnings("unchecked")
private ConsumerRecords<String, String> pollForRecords() {
Timer timer = time.timer(15000);
while (timer.notExpired()) {
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty())
return records;
}
throw new org.apache.kafka.common.errors.TimeoutException("no records to return");
}
private void waitForConsumerPoll(Supplier<Boolean> testCondition, String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return testCondition.get();
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
private static final String NAME = "name";
private static final String DESCRIPTION = "description";
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPollTestUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -126,9 +127,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@ -509,7 +508,8 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent();
waitForConsumerPoll(
ConsumerPollTestUtils.waitForCondition(
consumer,
callbackExecuted::get,
"Consumer.poll() did not execute callback within timeout"
);
@ -679,7 +679,8 @@ public class AsyncKafkaConsumerTest {
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent();
waitForConsumerPoll(
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> callback.invoked == 1 && callback.exception == null,
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
);
@ -1487,13 +1488,11 @@ public class AsyncKafkaConsumerTest {
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
if (expectedExceptionOpt.isPresent()) {
Exception expectedException = expectedExceptionOpt.get();
waitForConsumerPollException(
e -> {
return Objects.equals(e.getClass(), expectedException.getClass()) &&
Objects.equals(e.getMessage(), expectedException.getMessage()) &&
Objects.equals(e.getCause(), expectedException.getCause());
},
ConsumerPollTestUtils.waitForException(
consumer,
e -> Objects.equals(e.getClass(), expectedException.getClass()) &&
Objects.equals(e.getMessage(), expectedException.getMessage()) &&
Objects.equals(e.getCause(), expectedException.getCause()),
"Consumer.poll() did not throw the expected exception " + expectedException
);
} else {
@ -1560,7 +1559,8 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
waitForConsumerPollException(
ConsumerPollTestUtils.waitForException(
consumer,
e -> e.getMessage().equals(expectedException.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
);
@ -1580,7 +1580,8 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
waitForConsumerPollException(
ConsumerPollTestUtils.waitForException(
consumer,
e -> e.getMessage().equals(expectedException1.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
);
@ -1853,7 +1854,8 @@ public class AsyncKafkaConsumerTest {
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
waitForConsumerPoll(
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> backgroundEventReaper.size() == 0,
"Consumer.poll() did not reap background events within timeout"
);
@ -2306,36 +2308,4 @@ public class AsyncKafkaConsumerTest {
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
}
private void waitForConsumerPoll(Supplier<Boolean> testCondition, String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return testCondition.get();
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
private void waitForConsumerPollException(Function<KafkaException, Boolean> testCondition, String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (KafkaException e) {
return testCondition.apply(e);
}
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
}

View File

@ -145,29 +145,29 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
}
private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = {
val startMs = System.currentTimeMillis
TestUtils.waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(1000))
false
} catch {
case _: SaslAuthenticationException => true
case _: Exception => true
}
}, s"Consumer.poll() did not throw a ${classOf[SaslAuthenticationException]} exception within the timeout")
}, s"Consumer.poll() did not throw an exception within the timeout")
val elapsedMs = System.currentTimeMillis - startMs
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
verifyAuthenticationException(consumer.partitionsFor(topic))
createClientCredential()
val producer = createProducer()
verifyWithRetry(sendOneRecord(producer))()
val expectedNumber = 1
TestUtils.waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(1000)).count == expectedNumber
consumer.poll(Duration.ofMillis(1000)).count == 1
} catch {
case _: SaslAuthenticationException => false
}
}, s"Consumer.poll() did not read the expected number of records ($expectedNumber) within the timeout")
}, s"Consumer.poll() did not read the expected number of records within the timeout")
}
@Test

View File

@ -26,7 +26,6 @@ import javax.security.auth.login.LoginContext
import kafka.api.{IntegrationTestHarness, SaslSetup}
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.CloseOptions
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@ -186,7 +185,6 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
consumer.assign(java.util.List.of(tp))
val startMs = System.currentTimeMillis()
TestUtils.waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(50))
@ -194,11 +192,10 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
} catch {
case _: SaslAuthenticationException => true
}
}, "Client not ready or disconnected within timeout")
}, "Consumer.poll() did not trigger a SaslAuthenticationException within timeout")
val endMs = System.currentTimeMillis()
require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client")
consumer.close(CloseOptions.timeout(Duration.ZERO))
consumer.close()
}
/**