MINOR: clean KafkaConsumer tests (#19669)
CI / build (push) Waiting to run Details

- Moving off deprecated methods
- Fixing argument order for assertEquals(...)
- Few other minor cleanups

Reviewers: PoAn Yang <payang@apache.org>, Lianet Magrans
 <lmagrans@confluent.io>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
Matthias J. Sax 2025-06-05 06:09:21 -07:00 committed by GitHub
parent 3edb406f98
commit a662bc5634
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 319 additions and 298 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -180,7 +181,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.clear();
if (consumer != null) {
try {
consumer.close(Duration.ZERO);
consumer.close(CloseOptions.timeout(Duration.ZERO));
} catch (Exception e) {
// best effort to clean up after each test, but may throw (ex. if callbacks where
// throwing errors)
@ -318,7 +319,7 @@ public class AsyncKafkaConsumerTest {
// Clean-up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
// If we get an error *other* than the TimeoutException, we'll fail the test.
try {
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
assertInstanceOf(TimeoutException.class, e.getCause());
} finally {
consumer = null;
@ -337,7 +338,7 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
forceCommitCallbackInvocation();
assertEquals(callback.invoked, 1);
assertEquals(1, callback.invoked);
assertNull(callback.exception);
}
@ -710,7 +711,7 @@ public class AsyncKafkaConsumerTest {
"group-id",
"client-id",
false));
consumer.close(Duration.ofMillis(timeoutMs));
consumer.close(CloseOptions.timeout(Duration.ofMillis(timeoutMs)));
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}
@ -735,7 +736,7 @@ public class AsyncKafkaConsumerTest {
false));
consumer.setGroupAssignmentSnapshot(partitions);
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
assertNotNull(t.getCause());
assertEquals(rootError, t.getCause());
@ -761,7 +762,7 @@ public class AsyncKafkaConsumerTest {
Duration timeout = Duration.ofMillis(timeoutMs);
try {
assertThrows(InterruptException.class, () -> consumer.close(timeout));
assertThrows(InterruptException.class, () -> consumer.close(CloseOptions.timeout(timeout)));
} finally {
Thread.interrupted();
}
@ -1080,7 +1081,7 @@ public class AsyncKafkaConsumerTest {
return null;
}).when(applicationEventHandler).add(any());
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
consumer.close(CloseOptions.timeout(Duration.ZERO));
// A commit was triggered and not completed exceptionally by the wakeup
assertNotNull(capturedEvent.get());
@ -1103,7 +1104,7 @@ public class AsyncKafkaConsumerTest {
markOffsetsReadyForCommitEvent();
consumer.commitAsync();
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(10)));
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
assertInstanceOf(TimeoutException.class, e.getCause());
}
@ -1124,7 +1125,7 @@ public class AsyncKafkaConsumerTest {
consumer.commitAsync(cb);
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10)));
assertDoesNotThrow(() -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
assertEquals(1, cb.invoked);
}
@ -1140,7 +1141,7 @@ public class AsyncKafkaConsumerTest {
completeCommitSyncApplicationEventSuccessfully();
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
consumer.close(CloseOptions.timeout(Duration.ZERO));
assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());