MINOR: Add javadoc for Connect public packages/classes (#16404)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-06-21 10:23:35 +02:00 committed by GitHub
parent 8d92535382
commit 0772144e51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 224 additions and 121 deletions

View File

@ -29,7 +29,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
/**
* Checkpoint records emitted by MirrorCheckpointConnector.
*/
public class Checkpoint {
public static final String TOPIC_KEY = "topic";
public static final String PARTITION_KEY = "partition";

View File

@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.regex.Pattern;
/** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */
/**
* Default implementation of {@link ReplicationPolicy} which prepends the source cluster alias to
* remote topic names.
* For example, if the source cluster alias is "us-west", topics created in the target cluster will be named
* us-west.&lt;TOPIC&gt;. The separator is customizable by setting {@link #SEPARATOR_CONFIG} and defaults to a period.
*/
public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable {
private static final Logger log = LoggerFactory.getLogger(DefaultReplicationPolicy.class);

View File

@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */
/**
* Heartbeat records emitted by MirrorHeartbeatConnector.
*/
public class Heartbeat {
public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";

View File

@ -21,12 +21,12 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating
* from legacy MM1, or for any use-case involving one-way replication.
/**
* Alternative implementation of {@link ReplicationPolicy} that does not rename remote topics.
* This is useful for migrating from legacy MirrorMaker, or for any use-case involving one-way replication.
* <p>
* N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
* your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
* already be the case.
* N.B. MirrorMaker is not able to prevent cycles when using this replication policy, so take care that
* your replication topology is acyclic. If migrating from legacy MirrorMaker, this will likely already be the case.
*/
public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);
@ -44,10 +44,11 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
/** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source
/**
* Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy does not include the source
* cluster alias in the remote topic name. Instead, topic names are unchanged.
* <p>
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
* In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#formatRemoteTopic(String, String)}.
*/
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
@ -58,11 +59,12 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
/** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy cannot know the source of
* a remote topic based on its name alone. If `source.cluster.alias` is provided,
* `topicSource` will return that.
/**
* Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy cannot know the source of
* a remote topic based on its name alone. If <code>source.cluster.alias</code> is provided,
* this method will return that.
* <p>
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
* In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#topicSource(String)}.
*/
@Override
public String topicSource(String topic) {
@ -73,9 +75,10 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
/** Since any topic may be a "remote topic", this just returns `topic`.
/**
* Since any topic may be a remote topic, this just returns `topic`.
* <p>
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
* In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#upstreamTopic(String)}.
*/
@Override
public String upstreamTopic(String topic) {

View File

@ -42,16 +42,9 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster.
* <p>
* Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed
* for individual clusters as follows:
* </p>
* <pre>
* MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
* MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
* MirrorClient mmClient = new Mirrorclient(mmClientConfig);
* </pre>
/**
* Client to interact with MirrorMaker internal topics (checkpoints, heartbeats) on a given cluster.
* Whenever possible use the methods from {@link RemoteClusterUtils} instead of directly using MirrorClient.
*/
public class MirrorClient implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MirrorClient.class);
@ -78,20 +71,25 @@ public class MirrorClient implements AutoCloseable {
this.consumerConfig = consumerConfig;
}
/** Close internal clients. */
/**
* Closes internal clients.
*/
public void close() {
adminClient.close();
}
/** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on
* relevant configuration properties, including {@code replication.policy.class}. */
/**
* Gets the {@link ReplicationPolicy} instance used to interpret remote topics. This instance is constructed based on
* relevant configuration properties, including {@code replication.policy.class}.
*/
public ReplicationPolicy replicationPolicy() {
return replicationPolicy;
}
/** Compute shortest number of hops from an upstream source cluster.
/**
* Computes the shortest number of hops from an upstream source cluster.
* For example, given replication flow A-&gt;B-&gt;C, there are two hops from A to C.
* Returns -1 if upstream cluster is unreachable.
* Returns -1 if the upstream cluster is unreachable.
*/
public int replicationHops(String upstreamClusterAlias) throws InterruptedException {
return heartbeatTopics().stream()
@ -102,21 +100,27 @@ public class MirrorClient implements AutoCloseable {
.orElse(-1);
}
/** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */
/**
* Finds all heartbeats topics on this cluster. Heartbeats topics are replicated from other clusters.
*/
public Set<String> heartbeatTopics() throws InterruptedException {
return listTopics().stream()
.filter(this::isHeartbeatTopic)
.collect(Collectors.toSet());
}
/** Find all checkpoint topics on this cluster. */
/**
* Finds all checkpoints topics on this cluster.
*/
public Set<String> checkpointTopics() throws InterruptedException {
return listTopics().stream()
.filter(this::isCheckpointTopic)
.collect(Collectors.toSet());
}
/** Find upstream clusters, which may be multiple hops away, based on incoming heartbeats. */
/**
* Finds upstream clusters, which may be multiple hops away, based on incoming heartbeats.
*/
public Set<String> upstreamClusters() throws InterruptedException {
return listTopics().stream()
.filter(this::isHeartbeatTopic)
@ -124,14 +128,18 @@ public class MirrorClient implements AutoCloseable {
.collect(Collectors.toSet());
}
/** Find all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints). */
/**
* Finds all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints).
*/
public Set<String> remoteTopics() throws InterruptedException {
return listTopics().stream()
.filter(this::isRemoteTopic)
.collect(Collectors.toSet());
}
/** Find all remote topics that have been replicated directly from the given source cluster. */
/**
* Finds all remote topics that have been replicated directly from the given source cluster.
*/
public Set<String> remoteTopics(String source) throws InterruptedException {
return listTopics().stream()
.filter(this::isRemoteTopic)
@ -139,11 +147,12 @@ public class MirrorClient implements AutoCloseable {
.collect(Collectors.toSet());
}
/** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
/**
* Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically
* renamed according to the ReplicationPolicy.
* @param consumerGroupId group ID of remote consumer group
* @param remoteClusterAlias alias of remote cluster
* @param timeout timeout
* @param consumerGroupId The group ID of remote consumer group
* @param remoteClusterAlias The alias of remote cluster
* @param timeout The maximum time to block when consuming from the checkpoints topic
*/
public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
String remoteClusterAlias, Duration timeout) {

View File

@ -17,7 +17,9 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@ -31,24 +33,19 @@ import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Configuration required for MirrorClient to talk to a given target cluster.
/**
* Configuration required for {@link MirrorClient} to talk to a given target cluster.
* <p>
* Generally, these properties come from an mm2.properties configuration file
* (@see MirrorMakerConfig.clientConfig):
* </p>
* <pre>
* MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
* MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
* </pre>
* <p>
* In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients.
* For example:
* </p>
* This needs to contain at least the connection details for the target cluster (<code>bootstrap.servers</code> and
* any required TLS/SASL configuration), as well as {@link #REPLICATION_POLICY_CLASS} when not using the default
* replication policy. It can also include {@link AdminClientConfig} and {@link ConsumerConfig} to customize the
* internal clients this uses. For example:
* <pre>
* bootstrap.servers = host1:9092
* consumer.client.id = mm2-client
* replication.policy.separator = __
* </pre>
* </p>
*/
public class MirrorClientConfig extends AbstractConfig {
public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
@ -110,8 +107,7 @@ public class MirrorClientConfig extends AbstractConfig {
}
private Map<String, Object> clientConfig(String prefix) {
Map<String, Object> props = new HashMap<>();
props.putAll(valuesWithPrefixOverride(prefix));
Map<String, Object> props = new HashMap<>(valuesWithPrefixOverride(prefix));
props.keySet().retainAll(CLIENT_CONFIG_DEF.names());
props.entrySet().removeIf(x -> x.getValue() == null);
return props;

View File

@ -17,6 +17,8 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@ -26,9 +28,10 @@ import java.util.Set;
import java.util.concurrent.TimeoutException;
/** Convenience methods for multi-cluster environments. Wraps {@link MirrorClient}
/**
* Convenience tool for multi-cluster environments. Wraps {@link MirrorClient}
* <p>
* Properties passed to these methods are used to construct internal Admin and Consumer clients.
* Properties passed to these methods are used to construct internal {@link Admin} and {@link Consumer} clients.
* Sub-configs like "admin.xyz" are also supported. For example:
* </p>
* <pre>
@ -44,8 +47,10 @@ public final class RemoteClusterUtils {
// utility class
private RemoteClusterUtils() {}
/** Find shortest number of hops from an upstream cluster.
* Returns -1 if the cluster is unreachable */
/**
* Finds the shortest number of hops from an upstream cluster.
* Returns -1 if the cluster is unreachable.
*/
public static int replicationHops(Map<String, Object> properties, String upstreamClusterAlias)
throws InterruptedException, TimeoutException {
try (MirrorClient client = new MirrorClient(properties)) {
@ -53,7 +58,9 @@ public final class RemoteClusterUtils {
}
}
/** Find all heartbeat topics */
/**
* Finds all heartbeats topics
*/
public static Set<String> heartbeatTopics(Map<String, Object> properties)
throws InterruptedException, TimeoutException {
try (MirrorClient client = new MirrorClient(properties)) {
@ -61,7 +68,9 @@ public final class RemoteClusterUtils {
}
}
/** Find all checkpoint topics */
/**
* Finds all checkpoints topics
*/
public static Set<String> checkpointTopics(Map<String, Object> properties)
throws InterruptedException, TimeoutException {
try (MirrorClient client = new MirrorClient(properties)) {
@ -69,7 +78,9 @@ public final class RemoteClusterUtils {
}
}
/** Find all upstream clusters */
/**
* Finds all upstream clusters
*/
public static Set<String> upstreamClusters(Map<String, Object> properties)
throws InterruptedException, TimeoutException {
try (MirrorClient client = new MirrorClient(properties)) {
@ -77,12 +88,13 @@ public final class RemoteClusterUtils {
}
}
/** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
* renamed according to the ReplicationPolicy.
* @param properties {@link MirrorClientConfig} properties to instantiate a {@link MirrorClient}
* @param consumerGroupId group ID of remote consumer group
* @param remoteClusterAlias alias of remote cluster
* @param timeout timeout
/**
* Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically
* renamed according to the configured {@link ReplicationPolicy}.
* @param properties Map of properties to instantiate a {@link MirrorClient}
* @param remoteClusterAlias The alias of the remote cluster
* @param consumerGroupId The group ID of remote consumer group
* @param timeout The maximum time to block when consuming from the checkpoints topic
*/
public static Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String, Object> properties,
String remoteClusterAlias, String consumerGroupId, Duration timeout)

View File

@ -19,28 +19,34 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.annotation.InterfaceStability;
/** Defines which topics are "remote topics". e.g. "us-west.topic1". */
/**
* An interface used by the MirrorMaker connectors to manage topics names between source and target clusters.
*/
@InterfaceStability.Evolving
public interface ReplicationPolicy {
/** How to rename remote topics; generally should be like us-west.topic1. */
/**
* Returns the remote topic name for the given topic and source cluster alias.
*/
String formatRemoteTopic(String sourceClusterAlias, String topic);
/** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
* Returns null if not a remote topic.
/**
* Returns the source cluster alias of given topic.
* Returns null if the given topic is not a remote topic.
*/
String topicSource(String topic);
/** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
/**
* Return the name of the given topic on the source cluster.
* <p>
* Topics may be replicated multiple hops, so the immediately upstream topic
* may itself be a remote topic.
* Topics may be replicated multiple hops, so the immediately upstream topic may itself be a remote topic.
* <p>
* Returns null if not a remote topic.
* Returns null if the given topic is not a remote topic.
*/
String upstreamTopic(String topic);
/** The name of the original source-topic, which may have been replicated multiple hops.
/**
* Returns the name of the original topic, which may have been replicated multiple hops.
* Returns the topic if it is not a remote topic.
*/
default String originalTopic(String topic) {
@ -52,37 +58,52 @@ public interface ReplicationPolicy {
}
}
/** Returns heartbeats topic name.*/
/**
* Returns the name of heartbeats topic.
*/
default String heartbeatsTopic() {
return "heartbeats";
}
/** Returns the offset-syncs topic for given cluster alias. */
/**
* Returns the name of the offset-syncs topic for given cluster alias.
*/
default String offsetSyncsTopic(String clusterAlias) {
return "mm2-offset-syncs." + clusterAlias + ".internal";
}
/** Returns the name checkpoint topic for given cluster alias. */
/**
* Returns the name of the checkpoints topic for given cluster alias.
*/
default String checkpointsTopic(String clusterAlias) {
return clusterAlias + ".checkpoints.internal";
}
/** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */
/**
* Returns true if the topic is a heartbeats topic
*/
default boolean isHeartbeatsTopic(String topic) {
return heartbeatsTopic().equals(originalTopic(topic));
}
/** check if topic is a checkpoint topic. */
/**
* Returns true if the topic is a checkpoints topic.
*/
default boolean isCheckpointsTopic(String topic) {
return topic.endsWith(".checkpoints.internal");
}
/** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
/**
* Returns true if the topic is one of MirrorMaker internal topics.
* This is used to make sure the topic doesn't need to be replicated.
*/
default boolean isMM2InternalTopic(String topic) {
return topic.endsWith(".internal");
}
/** Internal topics are never replicated. */
/**
* Returns true if the topic is considered an internal topic.
*/
default boolean isInternalTopic(String topic) {
boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".");
boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal");

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.connect.mirror;
/** Directional pair of clusters, where source is replicated to target. */
/**
* Directional pair of clusters, where source is mirrored to target.
*/
public class SourceAndTarget {
private final String source;
private final String target;

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides APIs for the MirrorMaker connectors and utilities to manage MirrorMaker resources.
*/
package org.apache.kafka.connect.mirror;

View File

@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
/**
* Task implementation for {@link MockSinkConnector}.
*/
public class MockSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class);

View File

@ -27,6 +27,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Task implementation for {@link MockSourceConnector}.
*/
public class MockSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class);

View File

@ -31,6 +31,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Task implementation for {@link SchemaSourceConnector}.
*/
public class SchemaSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class);

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
/**
* A connector primarily intended for system tests.
* @see VerifiableSinkTask
*/
public class VerifiableSinkConnector extends SinkConnector {

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
/**
* A connector primarily intended for system tests.
* @see VerifiableSourceTask
*/
public class VerifiableSourceConnector extends SourceConnector {

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides source and sink connector implementations used for testing
*/
package org.apache.kafka.connect.tools;