mirror of https://github.com/apache/kafka.git
KAFKA-15913: Migrate async consumer tests to mocks (#14930)
Use mocks to test the AsyncKafkaConsumer Eliminate the use of ConsumerTestBuilder Mock all resources that were previously retrieved via leaking the background thread with mockito spys Always use the default constructor of AsyncKafkaConsumer as much as possible, inject mocks via factories. Timeouts are mocked directly by timeout exceptions instead of waiting for futures to time out. I did not port the autocommit mocking code, because it was mostly testing the integration of foreground and background thread (or making the spy's work which broke during the autocommit on close) and is currently being reimplemented anyway. New test runs 10x faster. Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
f385ef468b
commit
0cb7d747fb
|
|
@ -94,7 +94,6 @@ import java.util.ConcurrentModificationException;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
|
|
@ -287,12 +286,26 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
AsyncKafkaConsumer(final ConsumerConfig config,
|
||||
final Deserializer<K> keyDeserializer,
|
||||
final Deserializer<V> valueDeserializer) {
|
||||
this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>());
|
||||
this(
|
||||
config,
|
||||
keyDeserializer,
|
||||
valueDeserializer,
|
||||
Time.SYSTEM,
|
||||
ApplicationEventHandler::new,
|
||||
FetchCollector::new,
|
||||
ConsumerMetadata::new,
|
||||
new LinkedBlockingQueue<>()
|
||||
);
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
AsyncKafkaConsumer(final ConsumerConfig config,
|
||||
final Deserializer<K> keyDeserializer,
|
||||
final Deserializer<V> valueDeserializer,
|
||||
final Time time,
|
||||
final ApplicationEventHandlerFactory applicationEventHandlerFactory,
|
||||
final FetchCollectorFactory<K, V> fetchCollectorFactory,
|
||||
final ConsumerMetadataFactory metadataFactory,
|
||||
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
|
||||
try {
|
||||
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
|
||||
|
|
@ -305,7 +318,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
|
||||
log.debug("Initializing the Kafka consumer");
|
||||
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
|
||||
this.time = Time.SYSTEM;
|
||||
this.time = time;
|
||||
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
|
||||
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
|
||||
this.clientTelemetryReporter.ifPresent(reporters::add);
|
||||
|
|
@ -319,7 +332,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
|
||||
interceptorList,
|
||||
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
|
||||
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
|
||||
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
|
||||
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
||||
metadata.bootstrap(addresses);
|
||||
|
||||
|
|
@ -360,7 +373,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
metadata,
|
||||
applicationEventQueue,
|
||||
requestManagersSupplier);
|
||||
this.applicationEventHandler = new ApplicationEventHandler(logContext,
|
||||
this.applicationEventHandler = applicationEventHandlerFactory.build(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventQueue,
|
||||
applicationEventProcessorSupplier,
|
||||
|
|
@ -391,7 +405,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);
|
||||
|
||||
// The FetchCollector is only used on the application thread.
|
||||
this.fetchCollector = new FetchCollector<>(logContext,
|
||||
this.fetchCollector = fetchCollectorFactory.build(logContext,
|
||||
metadata,
|
||||
subscriptions,
|
||||
fetchConfig,
|
||||
|
|
@ -420,49 +434,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
// Visible for testing
|
||||
AsyncKafkaConsumer(LogContext logContext,
|
||||
String clientId,
|
||||
Deserializers<K, V> deserializers,
|
||||
FetchBuffer fetchBuffer,
|
||||
FetchCollector<K, V> fetchCollector,
|
||||
ConsumerInterceptors<K, V> interceptors,
|
||||
Time time,
|
||||
ApplicationEventHandler applicationEventHandler,
|
||||
BlockingQueue<BackgroundEvent> backgroundEventQueue,
|
||||
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
|
||||
Metrics metrics,
|
||||
SubscriptionState subscriptions,
|
||||
ConsumerMetadata metadata,
|
||||
long retryBackoffMs,
|
||||
int defaultApiTimeoutMs,
|
||||
List<ConsumerPartitionAssignor> assignors,
|
||||
String groupId) {
|
||||
this.log = logContext.logger(getClass());
|
||||
this.subscriptions = subscriptions;
|
||||
this.clientId = clientId;
|
||||
this.fetchBuffer = fetchBuffer;
|
||||
this.fetchCollector = fetchCollector;
|
||||
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
|
||||
this.interceptors = Objects.requireNonNull(interceptors);
|
||||
this.time = time;
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor(
|
||||
logContext,
|
||||
backgroundEventQueue,
|
||||
applicationEventHandler,
|
||||
rebalanceListenerInvoker
|
||||
);
|
||||
this.metrics = metrics;
|
||||
this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty());
|
||||
this.metadata = metadata;
|
||||
this.retryBackoffMs = retryBackoffMs;
|
||||
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
|
||||
this.deserializers = deserializers;
|
||||
this.applicationEventHandler = applicationEventHandler;
|
||||
this.assignors = assignors;
|
||||
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
|
||||
this.clientTelemetryReporter = Optional.empty();
|
||||
}
|
||||
|
||||
AsyncKafkaConsumer(LogContext logContext,
|
||||
Time time,
|
||||
ConsumerConfig config,
|
||||
|
|
@ -563,6 +534,47 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
);
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
interface ApplicationEventHandlerFactory {
|
||||
|
||||
ApplicationEventHandler build(
|
||||
final LogContext logContext,
|
||||
final Time time,
|
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue,
|
||||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
|
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
final Supplier<RequestManagers> requestManagersSupplier
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
interface FetchCollectorFactory<K, V> {
|
||||
|
||||
FetchCollector<K, V> build(
|
||||
final LogContext logContext,
|
||||
final ConsumerMetadata metadata,
|
||||
final SubscriptionState subscriptions,
|
||||
final FetchConfig fetchConfig,
|
||||
final Deserializers<K, V> deserializers,
|
||||
final FetchMetricsManager metricsManager,
|
||||
final Time time
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
interface ConsumerMetadataFactory {
|
||||
|
||||
ConsumerMetadata build(
|
||||
final ConsumerConfig config,
|
||||
final SubscriptionState subscriptions,
|
||||
final LogContext logContext,
|
||||
final ClusterResourceListeners clusterResourceListeners
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config,
|
||||
final GroupRebalanceConfig groupRebalanceConfig) {
|
||||
final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata(
|
||||
|
|
@ -1756,8 +1768,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
void maybeInvokeCommitCallbacks() {
|
||||
private void maybeInvokeCommitCallbacks() {
|
||||
if (callbacks() > 0) {
|
||||
invoker.executeCallbacks();
|
||||
}
|
||||
|
|
@ -1768,6 +1779,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return invoker.callbackQueue.size();
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
SubscriptionState subscriptions() {
|
||||
return subscriptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is
|
||||
* achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -21,9 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
|
|||
import org.apache.kafka.clients.GroupRebalanceConfig;
|
||||
import org.apache.kafka.clients.MockClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
|
|
@ -38,10 +36,7 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
|
@ -101,10 +96,6 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
final MockClient client;
|
||||
final Optional<GroupInformation> groupInfo;
|
||||
|
||||
public ConsumerTestBuilder() {
|
||||
this(Optional.empty());
|
||||
}
|
||||
|
||||
public ConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
|
||||
this(groupInfo, true, true);
|
||||
}
|
||||
|
|
@ -315,78 +306,6 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ApplicationEventHandlerTestBuilder extends ConsumerTestBuilder {
|
||||
|
||||
public final ApplicationEventHandler applicationEventHandler;
|
||||
|
||||
public ApplicationEventHandlerTestBuilder(Optional<GroupInformation> groupInfo, boolean enableAutoCommit, boolean enableAutoTick) {
|
||||
super(groupInfo, enableAutoCommit, enableAutoTick);
|
||||
this.applicationEventHandler = spy(new ApplicationEventHandler(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventQueue,
|
||||
() -> applicationEventProcessor,
|
||||
() -> networkClientDelegate,
|
||||
() -> requestManagers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closeQuietly(applicationEventHandler, ApplicationEventHandler.class.getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public static class AsyncKafkaConsumerTestBuilder extends ApplicationEventHandlerTestBuilder {
|
||||
|
||||
final AsyncKafkaConsumer<String, String> consumer;
|
||||
|
||||
final FetchCollector<String, String> fetchCollector;
|
||||
|
||||
public AsyncKafkaConsumerTestBuilder(Optional<GroupInformation> groupInfo, boolean enableAutoCommit, boolean enableAutoTick) {
|
||||
super(groupInfo, enableAutoCommit, enableAutoTick);
|
||||
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
|
||||
List<ConsumerPartitionAssignor> assignors = ConsumerPartitionAssignor.getAssignorInstances(
|
||||
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
|
||||
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
|
||||
);
|
||||
Deserializers<String, String> deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
|
||||
this.fetchCollector = spy(new FetchCollector<>(logContext,
|
||||
metadata,
|
||||
subscriptions,
|
||||
fetchConfig,
|
||||
deserializers,
|
||||
metricsManager,
|
||||
time));
|
||||
this.consumer = spy(new AsyncKafkaConsumer<>(
|
||||
logContext,
|
||||
clientId,
|
||||
deserializers,
|
||||
new FetchBuffer(logContext),
|
||||
fetchCollector,
|
||||
new ConsumerInterceptors<>(Collections.emptyList()),
|
||||
time,
|
||||
applicationEventHandler,
|
||||
backgroundEventQueue,
|
||||
rebalanceListenerInvoker,
|
||||
metrics,
|
||||
subscriptions,
|
||||
metadata,
|
||||
retryBackoffMs,
|
||||
60000,
|
||||
assignors,
|
||||
groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
public void close(final Duration timeout) {
|
||||
consumer.close(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
public static class GroupInformation {
|
||||
|
||||
final GroupState groupState;
|
||||
|
|
|
|||
Loading…
Reference in New Issue