KAFKA-15517: Improve MirrorMaker logging in case of authorization errors (#15558)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Dmitry Werner 2024-04-16 15:09:14 +05:00 committed by GitHub
parent f7eb96230d
commit a1ca788c99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 236 additions and 103 deletions

View File

@ -43,6 +43,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY; import static org.apache.kafka.connect.mirror.Checkpoint.CONSUMER_GROUP_ID_KEY;
import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY; import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
/** Replicate consumer group state between clusters. Emits checkpoint records. /** Replicate consumer group state between clusters. Emits checkpoint records.
* *
@ -216,7 +217,10 @@ public class MirrorCheckpointConnector extends SourceConnector {
Collection<ConsumerGroupListing> listConsumerGroups() Collection<ConsumerGroupListing> listConsumerGroups()
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return sourceAdminClient.listConsumerGroups().valid().get(); return adminCall(
() -> sourceAdminClient.listConsumerGroups().valid().get(),
() -> "list consumer groups on " + config.sourceClusterAlias() + " cluster"
);
} }
private void createInternalTopics() { private void createInternalTopics() {
@ -229,7 +233,11 @@ public class MirrorCheckpointConnector extends SourceConnector {
Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group) Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(); return adminCall(
() -> sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(),
() -> String.format("list offsets for consumer group %s on %s cluster", group,
config.sourceClusterAlias())
);
} }
boolean shouldReplicateByGroupFilter(String group) { boolean shouldReplicateByGroupFilter(String group) {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.connect.mirror; package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
@ -48,6 +47,8 @@ import java.util.concurrent.ExecutionException;
import java.time.Duration; import java.time.Duration;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
/** Emits checkpoints for upstream consumer groups. */ /** Emits checkpoints for upstream consumer groups. */
public class MirrorCheckpointTask extends SourceTask { public class MirrorCheckpointTask extends SourceTask {
@ -228,7 +229,10 @@ public class MirrorCheckpointTask extends SourceTask {
// short circuit if stopping // short circuit if stopping
return Collections.emptyMap(); return Collections.emptyMap();
} }
return sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(); return adminCall(
() -> sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(),
() -> String.format("list offsets for consumer group %s on %s cluster", group, sourceClusterAlias)
);
} }
Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition, Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition,
@ -277,9 +281,11 @@ public class MirrorCheckpointTask extends SourceTask {
System.currentTimeMillis() - record.timestamp()); System.currentTimeMillis() - record.timestamp());
} }
private void refreshIdleConsumerGroupOffset() { private void refreshIdleConsumerGroupOffset() throws ExecutionException, InterruptedException {
Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = adminCall(
.describeConsumerGroups(consumerGroups).describedGroups(); () -> targetAdminClient.describeConsumerGroups(consumerGroups).describedGroups(),
() -> String.format("describe consumer groups %s on %s cluster", consumerGroups, targetClusterAlias)
);
for (String group : consumerGroups) { for (String group : consumerGroups) {
try { try {
@ -289,8 +295,13 @@ public class MirrorCheckpointTask extends SourceTask {
// (1) idle: because the consumer at target is not actively consuming the mirrored topic // (1) idle: because the consumer at target is not actively consuming the mirrored topic
// (2) dead: the new consumer that is recently created at source and never existed at target // (2) dead: the new consumer that is recently created at source and never existed at target
if (consumerGroupState == ConsumerGroupState.EMPTY) { if (consumerGroupState == ConsumerGroupState.EMPTY) {
idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) idleConsumerGroupsOffset.put(
.partitionsToOffsetAndMetadata().get()); group,
adminCall(
() -> targetAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(),
() -> String.format("list offsets for consumer group %s on %s cluster", group, targetClusterAlias)
)
);
} }
// new consumer upstream has state "DEAD" and will be identified during the offset sync-up // new consumer upstream has state "DEAD" and will be identified during the offset sync-up
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
@ -299,7 +310,7 @@ public class MirrorCheckpointTask extends SourceTask {
} }
} }
Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() { Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() throws ExecutionException, InterruptedException {
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>(); Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();
// first, sync offsets for the idle consumers at target // first, sync offsets for the idle consumers at target
@ -360,20 +371,24 @@ public class MirrorCheckpointTask extends SourceTask {
return offsetToSyncAll; return offsetToSyncAll;
} }
void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) { void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) throws ExecutionException, InterruptedException {
if (targetAdminClient != null) { if (targetAdminClient != null) {
AlterConsumerGroupOffsetsResult result = targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync); adminCall(
result.all().whenComplete((v, throwable) -> { () -> targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync).all()
if (throwable != null) { .whenComplete((v, throwable) -> {
if (throwable.getCause() instanceof UnknownMemberIdException) { if (throwable != null) {
log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", consumerGroupId); if (throwable.getCause() instanceof UnknownMemberIdException) {
} else { log.warn("Unable to sync offsets for consumer group {}. This is likely caused " +
log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable); "by consumers currently using this group in the target cluster.", consumerGroupId);
} } else {
} else { log.error("Unable to sync offsets for consumer group {}.", consumerGroupId, throwable);
log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId); }
} } else {
}); log.trace("Sync-ed {} offsets for consumer group {}.", offsetToSync.size(), consumerGroupId);
}
}),
() -> String.format("alter offsets for consumer group %s on %s cluster", consumerGroupId, targetClusterAlias)
);
} }
} }

View File

@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED; import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY; import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY; import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
/** Replicate data, configuration, and ACLs between clusters. /** Replicate data, configuration, and ACLs between clusters.
* *
@ -139,9 +140,10 @@ public class MirrorSourceConnector extends SourceConnector {
} }
// visible for testing // visible for testing
MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) { MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient, MirrorSourceConfig config) {
this.sourceAdminClient = sourceAdminClient; this.sourceAdminClient = sourceAdminClient;
this.targetAdminClient = targetAdminClient; this.targetAdminClient = targetAdminClient;
this.config = config;
} }
@Override @Override
@ -495,59 +497,83 @@ public class MirrorSourceConnector extends SourceConnector {
} }
// visible for testing // visible for testing
void createNewTopics(Map<String, NewTopic> newTopics) { void createNewTopics(Map<String, NewTopic> newTopics) throws ExecutionException, InterruptedException {
targetAdminClient.createTopics(newTopics.values(), new CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> { adminCall(
if (e != null) { () -> {
log.warn("Could not create topic {}.", k, e); targetAdminClient.createTopics(newTopics.values(), new CreateTopicsOptions()).values()
} else { .forEach((k, v) -> v.whenComplete((x, e) -> {
log.info("Created remote topic {} with {} partitions.", k, newTopics.get(k).numPartitions()); if (e != null) {
} log.warn("Could not create topic {}.", k, e);
})); } else {
log.info("Created remote topic {} with {} partitions.", k, newTopics.get(k).numPartitions());
}
}));
return null;
},
() -> String.format("create topics %s on %s cluster", newTopics, config.targetClusterAlias())
);
} }
void createNewPartitions(Map<String, NewPartitions> newPartitions) { void createNewPartitions(Map<String, NewPartitions> newPartitions) throws ExecutionException, InterruptedException {
targetAdminClient.createPartitions(newPartitions).values().forEach((k, v) -> v.whenComplete((x, e) -> { adminCall(
if (e instanceof InvalidPartitionsException) { () -> {
// swallow, this is normal targetAdminClient.createPartitions(newPartitions).values().forEach((k, v) -> v.whenComplete((x, e) -> {
} else if (e != null) { if (e instanceof InvalidPartitionsException) {
log.warn("Could not create topic-partitions for {}.", k, e); // swallow, this is normal
} else { } else if (e != null) {
log.info("Increased size of {} to {} partitions.", k, newPartitions.get(k).totalCount()); log.warn("Could not create topic-partitions for {}.", k, e);
} } else {
})); log.info("Increased size of {} to {} partitions.", k, newPartitions.get(k).totalCount());
}
}));
return null;
},
() -> String.format("create partitions %s on %s cluster", newPartitions, config.targetClusterAlias())
);
} }
private Set<String> listTopics(Admin adminClient) private Set<String> listTopics(Admin adminClient)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return adminClient.listTopics().names().get(); return adminCall(
() -> adminClient.listTopics().names().get(),
() -> "list topics on " + actualClusterAlias(adminClient) + " cluster"
);
} }
private Optional<Collection<AclBinding>> listTopicAclBindings() private Optional<Collection<AclBinding>> listTopicAclBindings()
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
Collection<AclBinding> bindings; return adminCall(
try { () -> {
bindings = sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get(); Collection<AclBinding> bindings;
} catch (ExecutionException e) { try {
if (e.getCause() instanceof SecurityDisabledException) { bindings = sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
if (noAclAuthorizer.compareAndSet(false, true)) { } catch (ExecutionException e) {
log.info( if (e.getCause() instanceof SecurityDisabledException) {
"No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. " if (noAclAuthorizer.compareAndSet(false, true)) {
+ "Consider disabling topic ACL syncing by setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'." log.info(
); "No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. "
} else { + "Consider disabling topic ACL syncing by setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'."
log.debug("Source-side ACL authorizer still not found; skipping topic ACL sync"); );
} } else {
return Optional.empty(); log.debug("Source-side ACL authorizer still not found; skipping topic ACL sync");
} else { }
throw e; return Optional.empty();
} } else {
} throw e;
return Optional.of(bindings); }
}
return Optional.of(bindings);
},
() -> "describe ACLs on " + config.sourceClusterAlias() + " cluster"
);
} }
private static Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics) private Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
return adminClient.describeTopics(topics).allTopicNames().get().values(); return adminCall(
() -> adminClient.describeTopics(topics).allTopicNames().get().values(),
() -> String.format("describe topics %s on %s cluster", topics, actualClusterAlias(adminClient))
);
} }
static Map<String, String> configToMap(Config config) { static Map<String, String> configToMap(Config config) {
@ -558,20 +584,26 @@ public class MirrorSourceConnector extends SourceConnector {
// visible for testing // visible for testing
// use deprecated alterConfigs API for broker compatibility back to 0.11.0 // use deprecated alterConfigs API for broker compatibility back to 0.11.0
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
void deprecatedAlterConfigs(Map<String, Config> topicConfigs) { void deprecatedAlterConfigs(Map<String, Config> topicConfigs) throws ExecutionException, InterruptedException {
Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream() Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream()
.collect(Collectors.toMap(x -> .collect(Collectors.toMap(x ->
new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue)); new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue));
log.trace("Syncing configs for {} topics.", configs.size()); log.trace("Syncing configs for {} topics.", configs.size());
targetAdminClient.alterConfigs(configs).values().forEach((k, v) -> v.whenComplete((x, e) -> { adminCall(
if (e != null) { () -> {
log.warn("Could not alter configuration of topic {}.", k.name(), e); targetAdminClient.alterConfigs(configs).values().forEach((k, v) -> v.whenComplete((x, e) -> {
} if (e != null) {
})); log.warn("Could not alter configuration of topic {}.", k.name(), e);
}
}));
return null;
},
() -> String.format("alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias())
);
} }
// visible for testing // visible for testing
void incrementalAlterConfigs(Map<String, Config> topicConfigs) { void incrementalAlterConfigs(Map<String, Config> topicConfigs) throws ExecutionException, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(); Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>();
for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) { for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
Collection<AlterConfigOp> ops = new ArrayList<>(); Collection<AlterConfigOp> ops = new ArrayList<>();
@ -587,36 +619,48 @@ public class MirrorSourceConnector extends SourceConnector {
} }
log.trace("Syncing configs for {} topics.", configOps.size()); log.trace("Syncing configs for {} topics.", configOps.size());
AtomicReference<Boolean> encounteredError = new AtomicReference<>(false); AtomicReference<Boolean> encounteredError = new AtomicReference<>(false);
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { adminCall(
if (e != null) { () -> {
if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS) targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
&& e instanceof UnsupportedVersionException && !encounteredError.get()) { if (e != null) {
//Fallback logic if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. " && e instanceof UnsupportedVersionException && !encounteredError.get()) {
+ "Therefore using deprecated AlterConfigs API for syncing configs for topic {}", //Fallback logic
sourceAndTarget.target(), k.name(), e); log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. "
encounteredError.set(true); + "Therefore using deprecated AlterConfigs API for syncing configs for topic {}",
useIncrementalAlterConfigs = false; sourceAndTarget.target(), k.name(), e);
} else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS) encounteredError.set(true);
&& e instanceof UnsupportedVersionException && !encounteredError.get()) { useIncrementalAlterConfigs = false;
log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); } else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
encounteredError.set(true); && e instanceof UnsupportedVersionException && !encounteredError.get()) {
context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '" log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e);
+ sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e)); encounteredError.set(true);
} else { context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '"
log.warn("Could not alter configuration of topic {}.", k.name(), e); + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e));
} } else {
} log.warn("Could not alter configuration of topic {}.", k.name(), e);
})); }
}
}));
return null;
},
() -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs, config.targetClusterAlias())
);
} }
private void updateTopicAcls(List<AclBinding> bindings) { private void updateTopicAcls(List<AclBinding> bindings) throws ExecutionException, InterruptedException {
log.trace("Syncing {} topic ACL bindings.", bindings.size()); log.trace("Syncing {} topic ACL bindings.", bindings.size());
targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { adminCall(
if (e != null) { () -> {
log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
} if (e != null) {
})); log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
}
}));
return null;
},
() -> String.format("create ACLs %s on %s cluster", bindings, config.targetClusterAlias())
);
} }
private static Stream<TopicPartition> expandTopicDescription(TopicDescription description) { private static Stream<TopicPartition> expandTopicDescription(TopicDescription description) {
@ -630,8 +674,11 @@ public class MirrorSourceConnector extends SourceConnector {
Set<ConfigResource> resources = topics.stream() Set<ConfigResource> resources = topics.stream()
.map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x)) .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream() return adminCall(
.collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue)); () -> sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream()
.collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue)),
() -> String.format("describe configs for topics %s on %s cluster", topics, config.sourceClusterAlias())
);
} }
Config targetConfig(Config sourceConfig, boolean incremental) { Config targetConfig(Config sourceConfig, boolean incremental) {
@ -701,4 +748,8 @@ public class MirrorSourceConnector extends SourceConnector {
String formatRemoteTopic(String topic) { String formatRemoteTopic(String topic) {
return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic); return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic);
} }
private String actualClusterAlias(Admin adminClient) {
return adminClient.equals(sourceAdminClient) ? config.sourceClusterAlias() : config.targetClusterAlias();
}
} }

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
@ -42,7 +43,9 @@ import java.util.List;
import java.util.HashMap; import java.util.HashMap;
import java.util.Collections; import java.util.Collections;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
@ -320,4 +323,21 @@ public final class MirrorUtils {
static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Admin admin) { static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Admin admin) {
createCompactedTopic(topicName, (short) 1, replicationFactor, admin); createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
} }
static <T> T adminCall(Callable<T> callable, Supplier<String> errMsg)
throws ExecutionException, InterruptedException {
try {
return callable.call();
} catch (ExecutionException | InterruptedException e) {
Throwable cause = e.getCause();
if (cause instanceof TopicAuthorizationException ||
cause instanceof ClusterAuthorizationException ||
cause instanceof GroupAuthorizationException) {
log.error("{} occurred while trying to {}", cause.getClass().getSimpleName(), errMsg.get());
}
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -116,7 +117,7 @@ public class MirrorCheckpointTaskTest {
} }
@Test @Test
public void testSyncOffset() { public void testSyncOffset() throws ExecutionException, InterruptedException {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>(); Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>(); Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
@ -170,7 +171,7 @@ public class MirrorCheckpointTaskTest {
} }
@Test @Test
public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() throws ExecutionException, InterruptedException {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>(); Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>(); Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeAclsResult; import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntry;
@ -27,6 +28,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.PatternType;
@ -78,9 +80,11 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -183,7 +187,8 @@ public class MirrorSourceConnectorTest {
public void testNoBrokerAclAuthorizer() throws Exception { public void testNoBrokerAclAuthorizer() throws Exception {
Admin sourceAdmin = mock(Admin.class); Admin sourceAdmin = mock(Admin.class);
Admin targetAdmin = mock(Admin.class); Admin targetAdmin = mock(Admin.class);
MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin,
new MirrorSourceConfig(makeProps()));
ExecutionException describeAclsFailure = new ExecutionException( ExecutionException describeAclsFailure = new ExecutionException(
"Failed to describe ACLs", "Failed to describe ACLs",
@ -224,6 +229,39 @@ public class MirrorSourceConnectorTest {
verifyNoInteractions(targetAdmin); verifyNoInteractions(targetAdmin);
} }
@Test
public void testMissingDescribeConfigsAcl() throws Exception {
Admin sourceAdmin = mock(Admin.class);
MirrorSourceConnector connector = new MirrorSourceConnector(
sourceAdmin,
mock(Admin.class),
new MirrorSourceConfig(makeProps())
);
ExecutionException describeConfigsFailure = new ExecutionException(
"Failed to describe topic configs",
new TopicAuthorizationException("Topic authorization failed")
);
@SuppressWarnings("unchecked")
KafkaFuture<Map<ConfigResource, Config>> describeConfigsFuture = mock(KafkaFuture.class);
when(describeConfigsFuture.get()).thenThrow(describeConfigsFailure);
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
when(describeConfigsResult.all()).thenReturn(describeConfigsFuture);
when(sourceAdmin.describeConfigs(any())).thenReturn(describeConfigsResult);
try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorUtils.class)) {
connectorLogs.setClassLoggerToTrace(MirrorUtils.class);
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
ExecutionException exception = assertThrows(ExecutionException.class, () -> connector.describeTopicConfigs(topics));
assertEquals(
exception.getCause().getClass().getSimpleName() + " occurred while trying to describe configs for topics [topic1, topic2] on source1 cluster",
connectorLogs.getMessages().get(0)
);
}
}
@Test @Test
public void testConfigPropertyFiltering() { public void testConfigPropertyFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),