mirror of https://github.com/apache/kafka.git
- Moving off deprecated methods - Fixing argument order for assertEquals(...) - Few other minor cleanups Reviewers: PoAn Yang <payang@apache.org>, Lianet Magrans <lmagrans@confluent.io>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
parent
3edb406f98
commit
a662bc5634
File diff suppressed because it is too large
Load Diff
|
@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
|
|||
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
|
||||
import org.apache.kafka.clients.MockClient;
|
||||
import org.apache.kafka.clients.NodeApiVersions;
|
||||
import org.apache.kafka.clients.consumer.CloseOptions;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
|
@ -180,7 +181,7 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.clear();
|
||||
if (consumer != null) {
|
||||
try {
|
||||
consumer.close(Duration.ZERO);
|
||||
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||
} catch (Exception e) {
|
||||
// best effort to clean up after each test, but may throw (ex. if callbacks where
|
||||
// throwing errors)
|
||||
|
@ -318,7 +319,7 @@ public class AsyncKafkaConsumerTest {
|
|||
// Clean-up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
|
||||
// If we get an error *other* than the TimeoutException, we'll fail the test.
|
||||
try {
|
||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
|
||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
|
||||
assertInstanceOf(TimeoutException.class, e.getCause());
|
||||
} finally {
|
||||
consumer = null;
|
||||
|
@ -337,7 +338,7 @@ public class AsyncKafkaConsumerTest {
|
|||
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
|
||||
forceCommitCallbackInvocation();
|
||||
|
||||
assertEquals(callback.invoked, 1);
|
||||
assertEquals(1, callback.invoked);
|
||||
assertNull(callback.exception);
|
||||
}
|
||||
|
||||
|
@ -710,7 +711,7 @@ public class AsyncKafkaConsumerTest {
|
|||
"group-id",
|
||||
"client-id",
|
||||
false));
|
||||
consumer.close(Duration.ofMillis(timeoutMs));
|
||||
consumer.close(CloseOptions.timeout(Duration.ofMillis(timeoutMs)));
|
||||
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
|
||||
}
|
||||
|
||||
|
@ -735,7 +736,7 @@ public class AsyncKafkaConsumerTest {
|
|||
false));
|
||||
consumer.setGroupAssignmentSnapshot(partitions);
|
||||
|
||||
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
|
||||
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
|
||||
assertNotNull(t.getCause());
|
||||
assertEquals(rootError, t.getCause());
|
||||
|
||||
|
@ -761,7 +762,7 @@ public class AsyncKafkaConsumerTest {
|
|||
Duration timeout = Duration.ofMillis(timeoutMs);
|
||||
|
||||
try {
|
||||
assertThrows(InterruptException.class, () -> consumer.close(timeout));
|
||||
assertThrows(InterruptException.class, () -> consumer.close(CloseOptions.timeout(timeout)));
|
||||
} finally {
|
||||
Thread.interrupted();
|
||||
}
|
||||
|
@ -1080,7 +1081,7 @@ public class AsyncKafkaConsumerTest {
|
|||
return null;
|
||||
}).when(applicationEventHandler).add(any());
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
consumer.close(Duration.ZERO);
|
||||
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||
|
||||
// A commit was triggered and not completed exceptionally by the wakeup
|
||||
assertNotNull(capturedEvent.get());
|
||||
|
@ -1103,7 +1104,7 @@ public class AsyncKafkaConsumerTest {
|
|||
markOffsetsReadyForCommitEvent();
|
||||
consumer.commitAsync();
|
||||
|
||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(10)));
|
||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
|
||||
assertInstanceOf(TimeoutException.class, e.getCause());
|
||||
}
|
||||
|
||||
|
@ -1124,7 +1125,7 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.commitAsync(cb);
|
||||
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10)));
|
||||
assertDoesNotThrow(() -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
|
||||
assertEquals(1, cb.invoked);
|
||||
}
|
||||
|
||||
|
@ -1140,7 +1141,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeCommitSyncApplicationEventSuccessfully();
|
||||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
|
||||
consumer.close(Duration.ZERO);
|
||||
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||
|
||||
assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
|
||||
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
|
||||
|
|
Loading…
Reference in New Issue