mirror of https://github.com/apache/kafka.git
- Moving off deprecated methods - Fixing argument order for assertEquals(...) - Few other minor cleanups Reviewers: PoAn Yang <payang@apache.org>, Lianet Magrans <lmagrans@confluent.io>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
parent
3edb406f98
commit
a662bc5634
File diff suppressed because it is too large
Load Diff
|
@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
|
||||||
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
|
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
|
||||||
import org.apache.kafka.clients.MockClient;
|
import org.apache.kafka.clients.MockClient;
|
||||||
import org.apache.kafka.clients.NodeApiVersions;
|
import org.apache.kafka.clients.NodeApiVersions;
|
||||||
|
import org.apache.kafka.clients.consumer.CloseOptions;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||||
|
@ -180,7 +181,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
backgroundEventQueue.clear();
|
backgroundEventQueue.clear();
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
try {
|
try {
|
||||||
consumer.close(Duration.ZERO);
|
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// best effort to clean up after each test, but may throw (ex. if callbacks where
|
// best effort to clean up after each test, but may throw (ex. if callbacks where
|
||||||
// throwing errors)
|
// throwing errors)
|
||||||
|
@ -318,7 +319,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
// Clean-up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
|
// Clean-up. Close the consumer here as we know it will cause a TimeoutException to be thrown.
|
||||||
// If we get an error *other* than the TimeoutException, we'll fail the test.
|
// If we get an error *other* than the TimeoutException, we'll fail the test.
|
||||||
try {
|
try {
|
||||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
|
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
|
||||||
assertInstanceOf(TimeoutException.class, e.getCause());
|
assertInstanceOf(TimeoutException.class, e.getCause());
|
||||||
} finally {
|
} finally {
|
||||||
consumer = null;
|
consumer = null;
|
||||||
|
@ -337,7 +338,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
|
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
|
||||||
forceCommitCallbackInvocation();
|
forceCommitCallbackInvocation();
|
||||||
|
|
||||||
assertEquals(callback.invoked, 1);
|
assertEquals(1, callback.invoked);
|
||||||
assertNull(callback.exception);
|
assertNull(callback.exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,7 +711,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
"group-id",
|
"group-id",
|
||||||
"client-id",
|
"client-id",
|
||||||
false));
|
false));
|
||||||
consumer.close(Duration.ofMillis(timeoutMs));
|
consumer.close(CloseOptions.timeout(Duration.ofMillis(timeoutMs)));
|
||||||
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
|
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -735,7 +736,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
false));
|
false));
|
||||||
consumer.setGroupAssignmentSnapshot(partitions);
|
consumer.setGroupAssignmentSnapshot(partitions);
|
||||||
|
|
||||||
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
|
Throwable t = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ZERO)));
|
||||||
assertNotNull(t.getCause());
|
assertNotNull(t.getCause());
|
||||||
assertEquals(rootError, t.getCause());
|
assertEquals(rootError, t.getCause());
|
||||||
|
|
||||||
|
@ -761,7 +762,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
Duration timeout = Duration.ofMillis(timeoutMs);
|
Duration timeout = Duration.ofMillis(timeoutMs);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
assertThrows(InterruptException.class, () -> consumer.close(timeout));
|
assertThrows(InterruptException.class, () -> consumer.close(CloseOptions.timeout(timeout)));
|
||||||
} finally {
|
} finally {
|
||||||
Thread.interrupted();
|
Thread.interrupted();
|
||||||
}
|
}
|
||||||
|
@ -1080,7 +1081,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
return null;
|
return null;
|
||||||
}).when(applicationEventHandler).add(any());
|
}).when(applicationEventHandler).add(any());
|
||||||
completeUnsubscribeApplicationEventSuccessfully();
|
completeUnsubscribeApplicationEventSuccessfully();
|
||||||
consumer.close(Duration.ZERO);
|
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||||
|
|
||||||
// A commit was triggered and not completed exceptionally by the wakeup
|
// A commit was triggered and not completed exceptionally by the wakeup
|
||||||
assertNotNull(capturedEvent.get());
|
assertNotNull(capturedEvent.get());
|
||||||
|
@ -1103,7 +1104,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
markOffsetsReadyForCommitEvent();
|
markOffsetsReadyForCommitEvent();
|
||||||
consumer.commitAsync();
|
consumer.commitAsync();
|
||||||
|
|
||||||
Exception e = assertThrows(KafkaException.class, () -> consumer.close(Duration.ofMillis(10)));
|
Exception e = assertThrows(KafkaException.class, () -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
|
||||||
assertInstanceOf(TimeoutException.class, e.getCause());
|
assertInstanceOf(TimeoutException.class, e.getCause());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1124,7 +1125,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
consumer.commitAsync(cb);
|
consumer.commitAsync(cb);
|
||||||
|
|
||||||
completeUnsubscribeApplicationEventSuccessfully();
|
completeUnsubscribeApplicationEventSuccessfully();
|
||||||
assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10)));
|
assertDoesNotThrow(() -> consumer.close(CloseOptions.timeout(Duration.ofMillis(10))));
|
||||||
assertEquals(1, cb.invoked);
|
assertEquals(1, cb.invoked);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1140,7 +1141,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
completeCommitSyncApplicationEventSuccessfully();
|
completeCommitSyncApplicationEventSuccessfully();
|
||||||
completeUnsubscribeApplicationEventSuccessfully();
|
completeUnsubscribeApplicationEventSuccessfully();
|
||||||
|
|
||||||
consumer.close(Duration.ZERO);
|
consumer.close(CloseOptions.timeout(Duration.ZERO));
|
||||||
|
|
||||||
assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
|
assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
|
||||||
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
|
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
|
||||||
|
|
Loading…
Reference in New Issue