diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java index 603f09df84c..3e0a2ee6177 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */ +/** + * Checkpoint records emitted by MirrorCheckpointConnector. + */ public class Checkpoint { public static final String TOPIC_KEY = "topic"; public static final String PARTITION_KEY = "partition"; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index fa2c5a75b24..7733ccf3fd5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.regex.Pattern; -/** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */ +/** + * Default implementation of {@link ReplicationPolicy} which prepends the source cluster alias to + * remote topic names. + * For example, if the source cluster alias is "us-west", topics created in the target cluster will be named + * us-west.<TOPIC>. The separator is customizable by setting {@link #SEPARATOR_CONFIG} and defaults to a period. + */ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable { private static final Logger log = LoggerFactory.getLogger(DefaultReplicationPolicy.class); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java index ab88e60439a..d63dfa70ff8 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */ +/** + * Heartbeat records emitted by MirrorHeartbeatConnector. + */ public class Heartbeat { public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias"; public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias"; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java index 16a3dfa11ff..1206becd5ee 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java @@ -21,13 +21,13 @@ import org.slf4j.LoggerFactory; import java.util.Map; -/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating - * from legacy MM1, or for any use-case involving one-way replication. - *
- * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that - * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely - * already be the case. - */ +/** + * Alternative implementation of {@link ReplicationPolicy} that does not rename remote topics. + * This is useful for migrating from legacy MirrorMaker, or for any use-case involving one-way replication. + *
+ * N.B. MirrorMaker is not able to prevent cycles when using this replication policy, so take care that + * your replication topology is acyclic. If migrating from legacy MirrorMaker, this will likely already be the case. + */ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class); @@ -44,11 +44,12 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source - * cluster alias in the remote topic name. Instead, topic names are unchanged. - *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy. - */ + /** + * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy does not include the source + * cluster alias in the remote topic name. Instead, topic names are unchanged. + *
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#formatRemoteTopic(String, String)}. + */ @Override public String formatRemoteTopic(String sourceClusterAlias, String topic) { if (looksLikeHeartbeat(topic)) { @@ -58,12 +59,13 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy cannot know the source of - * a remote topic based on its name alone. If `source.cluster.alias` is provided, - * `topicSource` will return that. - *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy.
- */
+ /**
+ * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy cannot know the source of
+ * a remote topic based on its name alone. If source.cluster.alias
is provided,
+ * this method will return that.
+ *
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#topicSource(String)}. + */ @Override public String topicSource(String topic) { if (looksLikeHeartbeat(topic)) { @@ -73,10 +75,11 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Since any topic may be a "remote topic", this just returns `topic`. - *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy. - */ + /** + * Since any topic may be a remote topic, this just returns `topic`. + *
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#upstreamTopic(String)}. + */ @Override public String upstreamTopic(String topic) { if (looksLikeHeartbeat(topic)) { diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java index a071521aa88..0b74b64ebbb 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java @@ -42,16 +42,9 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster. - *
- * Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed - * for individual clusters as follows: - *
- *- * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props); - * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster"); - * MirrorClient mmClient = new Mirrorclient(mmClientConfig); - *+/** + * Client to interact with MirrorMaker internal topics (checkpoints, heartbeats) on a given cluster. + * Whenever possible use the methods from {@link RemoteClusterUtils} instead of directly using MirrorClient. */ public class MirrorClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(MirrorClient.class); @@ -78,20 +71,25 @@ public class MirrorClient implements AutoCloseable { this.consumerConfig = consumerConfig; } - /** Close internal clients. */ + /** + * Closes internal clients. + */ public void close() { adminClient.close(); } - /** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on - * relevant configuration properties, including {@code replication.policy.class}. */ + /** + * Gets the {@link ReplicationPolicy} instance used to interpret remote topics. This instance is constructed based on + * relevant configuration properties, including {@code replication.policy.class}. + */ public ReplicationPolicy replicationPolicy() { return replicationPolicy; } - /** Compute shortest number of hops from an upstream source cluster. - * For example, given replication flow A->B->C, there are two hops from A to C. - * Returns -1 if upstream cluster is unreachable. + /** + * Computes the shortest number of hops from an upstream source cluster. + * For example, given replication flow A->B->C, there are two hops from A to C. + * Returns -1 if the upstream cluster is unreachable. */ public int replicationHops(String upstreamClusterAlias) throws InterruptedException { return heartbeatTopics().stream() @@ -102,21 +100,27 @@ public class MirrorClient implements AutoCloseable { .orElse(-1); } - /** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */ + /** + * Finds all heartbeats topics on this cluster. Heartbeats topics are replicated from other clusters. + */ public Set
- * Generally, these properties come from an mm2.properties configuration file - * (@see MirrorMakerConfig.clientConfig): - *
- *- * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props); - * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster"); - *- *
- * In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients. - * For example: - *
- *+/** + * Configuration required for {@link MirrorClient} to talk to a given target cluster. + *+ * */ public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_CLASS = "replication.policy.class"; @@ -110,8 +107,7 @@ public class MirrorClientConfig extends AbstractConfig { } private Map+ * This needs to contain at least the connection details for the target cluster (
bootstrap.servers
and + * any required TLS/SASL configuration), as well as {@link #REPLICATION_POLICY_CLASS} when not using the default + * replication policy. It can also include {@link AdminClientConfig} and {@link ConsumerConfig} to customize the + * internal clients this uses. For example: + ** bootstrap.servers = host1:9092 * consumer.client.id = mm2-client * replication.policy.separator = __ - *+ *
- * Properties passed to these methods are used to construct internal Admin and Consumer clients. - * Sub-configs like "admin.xyz" are also supported. For example: - *
- *- * bootstrap.servers = host1:9092 - * consumer.client.id = mm2-client - *- *
- * @see MirrorClientConfig for additional properties used by the internal MirrorClient. - *
+/** + * Convenience tool for multi-cluster environments. Wraps {@link MirrorClient} + *+ * Properties passed to these methods are used to construct internal {@link Admin} and {@link Consumer} clients. + * Sub-configs like "admin.xyz" are also supported. For example: + *
+ *+ * bootstrap.servers = host1:9092 + * consumer.client.id = mm2-client + *+ *
+ * @see MirrorClientConfig for additional properties used by the internal MirrorClient. + *
*/ public final class RemoteClusterUtils { // utility class private RemoteClusterUtils() {} - /** Find shortest number of hops from an upstream cluster. - * Returns -1 if the cluster is unreachable */ + /** + * Finds the shortest number of hops from an upstream cluster. + * Returns -1 if the cluster is unreachable. + */ public static int replicationHops(Map- * Topics may be replicated multiple hops, so the immediately upstream topic - * may itself be a remote topic. - *
- * Returns null if not a remote topic. + /** + * Return the name of the given topic on the source cluster. + *
+ * Topics may be replicated multiple hops, so the immediately upstream topic may itself be a remote topic. + *
+ * Returns null if the given topic is not a remote topic. */ String upstreamTopic(String topic); - /** The name of the original source-topic, which may have been replicated multiple hops. - * Returns the topic if it is not a remote topic. + /** + * Returns the name of the original topic, which may have been replicated multiple hops. + * Returns the topic if it is not a remote topic. */ default String originalTopic(String topic) { String upstream = upstreamTopic(topic); @@ -52,37 +58,52 @@ public interface ReplicationPolicy { } } - /** Returns heartbeats topic name.*/ + /** + * Returns the name of heartbeats topic. + */ default String heartbeatsTopic() { return "heartbeats"; } - /** Returns the offset-syncs topic for given cluster alias. */ + /** + * Returns the name of the offset-syncs topic for given cluster alias. + */ default String offsetSyncsTopic(String clusterAlias) { return "mm2-offset-syncs." + clusterAlias + ".internal"; } - /** Returns the name checkpoint topic for given cluster alias. */ + /** + * Returns the name of the checkpoints topic for given cluster alias. + */ default String checkpointsTopic(String clusterAlias) { return clusterAlias + ".checkpoints.internal"; } - /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */ + /** + * Returns true if the topic is a heartbeats topic + */ default boolean isHeartbeatsTopic(String topic) { return heartbeatsTopic().equals(originalTopic(topic)); } - /** check if topic is a checkpoint topic. */ + /** + * Returns true if the topic is a checkpoints topic. + */ default boolean isCheckpointsTopic(String topic) { return topic.endsWith(".checkpoints.internal"); } - /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/ + /** + * Returns true if the topic is one of MirrorMaker internal topics. + * This is used to make sure the topic doesn't need to be replicated. + */ default boolean isMM2InternalTopic(String topic) { return topic.endsWith(".internal"); } - /** Internal topics are never replicated. */ + /** + * Returns true if the topic is considered an internal topic. + */ default boolean isInternalTopic(String topic) { boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith("."); boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal"); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java index f9793aceed9..7bafd52244e 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.connect.mirror; -/** Directional pair of clusters, where source is replicated to target. */ +/** + * Directional pair of clusters, where source is mirrored to target. + */ public class SourceAndTarget { private final String source; private final String target; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java new file mode 100644 index 00000000000..48ed522ae34 --- /dev/null +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides APIs for the MirrorMaker connectors and utilities to manage MirrorMaker resources. + */ +package org.apache.kafka.connect.mirror; \ No newline at end of file diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java index 392c58fb01f..1c97ced61ef 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java @@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; +/** + * Task implementation for {@link MockSinkConnector}. + */ public class MockSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java index c09fa6a5319..f69c58b99ab 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java @@ -27,6 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link MockSourceConnector}. + */ public class MockSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index 7895dbbef4f..c40e0932e53 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -31,6 +31,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link SchemaSourceConnector}. + */ public class SchemaSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java index fbe29e13ed4..b198dafc535 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSinkTask */ public class VerifiableSinkConnector extends SinkConnector { diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java index 6262cc3b0bb..e23992dfe1b 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSourceTask */ public class VerifiableSourceConnector extends SourceConnector { diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java new file mode 100644 index 00000000000..7a5ef74c51a --- /dev/null +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides source and sink connector implementations used for testing + */ +package org.apache.kafka.connect.tools; \ No newline at end of file