mirror of https://github.com/apache/kafka.git
KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts (#13575)
Reduce flakiness of `MirrorConnectorsWithCustomForwardingAdminIntegrationTest` Reviewers: Josep Prat <jlprat@apache.org>
This commit is contained in:
parent
ecdef88f74
commit
7061475445
|
@ -37,9 +37,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/** Customised ForwardingAdmin for testing only.
|
||||
* The class create/alter topics, partitions and ACLs in Kafka then store metadata in {@link FakeLocalMetadataStore}.
|
||||
|
@ -47,7 +44,6 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
|
||||
private static final Logger log = LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
|
||||
private final long timeout = 1000L;
|
||||
|
||||
public FakeForwardingAdminWithLocalMetadata(Map<String, Object> configs) {
|
||||
super(configs);
|
||||
|
@ -56,35 +52,29 @@ public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
|
|||
@Override
|
||||
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
|
||||
CreateTopicsResult createTopicsResult = super.createTopics(newTopics, options);
|
||||
newTopics.forEach(newTopic -> {
|
||||
try {
|
||||
log.info("Add topic '{}' to cluster and metadata store", newTopic);
|
||||
// Wait for topic to be created before edit the fake local store
|
||||
createTopicsResult.values().get(newTopic.name()).get(timeout, TimeUnit.MILLISECONDS);
|
||||
newTopics.forEach(newTopic -> createTopicsResult.values().get(newTopic.name()).whenComplete((ignored, error) -> {
|
||||
if (error == null) {
|
||||
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e.getCause() instanceof TopicExistsException) {
|
||||
log.warn("Topic '{}' already exists. Update the local metadata store if absent", newTopic.name());
|
||||
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
|
||||
} else
|
||||
log.error(e.getMessage());
|
||||
} else if (error.getCause() instanceof TopicExistsException) {
|
||||
log.warn("Topic '{}' already exists. Update the local metadata store if absent", newTopic.name());
|
||||
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
|
||||
} else {
|
||||
log.error("Unable to intercept admin client operation", error);
|
||||
}
|
||||
});
|
||||
}));
|
||||
return createTopicsResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
|
||||
CreatePartitionsResult createPartitionsResult = super.createPartitions(newPartitions, options);
|
||||
newPartitions.forEach((topic, newPartition) -> {
|
||||
try {
|
||||
// Wait for topic partition to be created before edit the fake local store
|
||||
createPartitionsResult.values().get(topic).get(timeout, TimeUnit.MILLISECONDS);
|
||||
newPartitions.forEach((topic, newPartition) -> createPartitionsResult.values().get(topic).whenComplete((ignored, error) -> {
|
||||
if (error == null) {
|
||||
FakeLocalMetadataStore.updatePartitionCount(topic, newPartition.totalCount());
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
log.error(e.getMessage());
|
||||
} else {
|
||||
log.error("Unable to intercept admin client operation", error);
|
||||
}
|
||||
});
|
||||
}));
|
||||
return createPartitionsResult;
|
||||
}
|
||||
|
||||
|
@ -92,17 +82,15 @@ public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
|
|||
@Override
|
||||
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
|
||||
AlterConfigsResult alterConfigsResult = super.alterConfigs(configs, options);
|
||||
configs.forEach((configResource, newConfigs) -> {
|
||||
try {
|
||||
configs.forEach((configResource, newConfigs) -> alterConfigsResult.values().get(configResource).whenComplete((ignored, error) -> {
|
||||
if (error == null) {
|
||||
if (configResource.type() == ConfigResource.Type.TOPIC) {
|
||||
// Wait for config to be altered before edit the fake local store
|
||||
alterConfigsResult.values().get(configResource).get(timeout, TimeUnit.MILLISECONDS);
|
||||
FakeLocalMetadataStore.updateTopicConfig(configResource.name(), newConfigs);
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
log.error(e.getMessage());
|
||||
} else {
|
||||
log.error("Unable to intercept admin client operation", error);
|
||||
}
|
||||
});
|
||||
}));
|
||||
return alterConfigsResult;
|
||||
}
|
||||
|
||||
|
@ -110,15 +98,13 @@ public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
|
|||
@Override
|
||||
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
|
||||
CreateAclsResult aclsResult = super.createAcls(acls, options);
|
||||
try {
|
||||
// Wait for acls to be created before edit the fake local store
|
||||
aclsResult.all().get(timeout, TimeUnit.MILLISECONDS);
|
||||
acls.forEach(aclBinding -> {
|
||||
aclsResult.values().forEach((aclBinding, future) -> future.whenComplete((ignored, error) -> {
|
||||
if (error == null) {
|
||||
FakeLocalMetadataStore.addACLs(aclBinding.entry().principal(), aclBinding);
|
||||
});
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.error("Unable to intercept admin client operation", error);
|
||||
}
|
||||
}));
|
||||
return aclsResult;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue