From 0772144e510b490fbfbd7fa96e926bdba95c8b00 Mon Sep 17 00:00:00 2001
From: Mickael Maison
- * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
- * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
- * already be the case.
- */
+/**
+ * Alternative implementation of {@link ReplicationPolicy} that does not rename remote topics.
+ * This is useful for migrating from legacy MirrorMaker, or for any use-case involving one-way replication.
+ *
+ * N.B. MirrorMaker is not able to prevent cycles when using this replication policy, so take care that
+ * your replication topology is acyclic. If migrating from legacy MirrorMaker, this will likely already be the case.
+ */
public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);
@@ -44,11 +44,12 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
- /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source
- * cluster alias in the remote topic name. Instead, topic names are unchanged.
- *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy.
- */
+ /**
+ * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy does not include the source
+ * cluster alias in the remote topic name. Instead, topic names are unchanged.
+ *
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#formatRemoteTopic(String, String)}.
+ */
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
if (looksLikeHeartbeat(topic)) {
@@ -58,12 +59,13 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
- /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy cannot know the source of
- * a remote topic based on its name alone. If `source.cluster.alias` is provided,
- * `topicSource` will return that.
- *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy.
- */
+ /**
+ * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy cannot know the source of
+ * a remote topic based on its name alone. If
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#topicSource(String)}.
+ */
@Override
public String topicSource(String topic) {
if (looksLikeHeartbeat(topic)) {
@@ -73,10 +75,11 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
}
}
- /** Since any topic may be a "remote topic", this just returns `topic`.
- *
- * In the special case of heartbeats, we defer to DefaultReplicationPolicy.
- */
+ /**
+ * Since any topic may be a remote topic, this just returns `topic`.
+ *
+ * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#upstreamTopic(String)}.
+ */
@Override
public String upstreamTopic(String topic) {
if (looksLikeHeartbeat(topic)) {
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index a071521aa88..0b74b64ebbb 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -42,16 +42,9 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster.
- *
- * Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed
- * for individual clusters as follows:
- *
- * Generally, these properties come from an mm2.properties configuration file
- * (@see MirrorMakerConfig.clientConfig):
- *
- * In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients.
- * For example:
- *
+ * This needs to contain at least the connection details for the target cluster (source.cluster.alias
is provided,
+ * this method will return that.
+ *
- * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
- * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
- * MirrorClient mmClient = new Mirrorclient(mmClientConfig);
- *
+/**
+ * Client to interact with MirrorMaker internal topics (checkpoints, heartbeats) on a given cluster.
+ * Whenever possible use the methods from {@link RemoteClusterUtils} instead of directly using MirrorClient.
*/
public class MirrorClient implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MirrorClient.class);
@@ -78,20 +71,25 @@ public class MirrorClient implements AutoCloseable {
this.consumerConfig = consumerConfig;
}
- /** Close internal clients. */
+ /**
+ * Closes internal clients.
+ */
public void close() {
adminClient.close();
}
- /** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on
- * relevant configuration properties, including {@code replication.policy.class}. */
+ /**
+ * Gets the {@link ReplicationPolicy} instance used to interpret remote topics. This instance is constructed based on
+ * relevant configuration properties, including {@code replication.policy.class}.
+ */
public ReplicationPolicy replicationPolicy() {
return replicationPolicy;
}
- /** Compute shortest number of hops from an upstream source cluster.
- * For example, given replication flow A->B->C, there are two hops from A to C.
- * Returns -1 if upstream cluster is unreachable.
+ /**
+ * Computes the shortest number of hops from an upstream source cluster.
+ * For example, given replication flow A->B->C, there are two hops from A to C.
+ * Returns -1 if the upstream cluster is unreachable.
*/
public int replicationHops(String upstreamClusterAlias) throws InterruptedException {
return heartbeatTopics().stream()
@@ -102,21 +100,27 @@ public class MirrorClient implements AutoCloseable {
.orElse(-1);
}
- /** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */
+ /**
+ * Finds all heartbeats topics on this cluster. Heartbeats topics are replicated from other clusters.
+ */
public Set
- * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
- * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
- *
- *
+/**
+ * Configuration required for {@link MirrorClient} to talk to a given target cluster.
+ *
+ * bootstrap.servers
and
+ * any required TLS/SASL configuration), as well as {@link #REPLICATION_POLICY_CLASS} when not using the default
+ * replication policy. It can also include {@link AdminClientConfig} and {@link ConsumerConfig} to customize the
+ * internal clients this uses. For example:
+ *
* bootstrap.servers = host1:9092
* consumer.client.id = mm2-client
* replication.policy.separator = __
- *
+ *
- * Properties passed to these methods are used to construct internal Admin and Consumer clients. - * Sub-configs like "admin.xyz" are also supported. For example: - *
- *- * bootstrap.servers = host1:9092 - * consumer.client.id = mm2-client - *- *
- * @see MirrorClientConfig for additional properties used by the internal MirrorClient. - *
+/** + * Convenience tool for multi-cluster environments. Wraps {@link MirrorClient} + *+ * Properties passed to these methods are used to construct internal {@link Admin} and {@link Consumer} clients. + * Sub-configs like "admin.xyz" are also supported. For example: + *
+ *+ * bootstrap.servers = host1:9092 + * consumer.client.id = mm2-client + *+ *
+ * @see MirrorClientConfig for additional properties used by the internal MirrorClient. + *
*/ public final class RemoteClusterUtils { // utility class private RemoteClusterUtils() {} - /** Find shortest number of hops from an upstream cluster. - * Returns -1 if the cluster is unreachable */ + /** + * Finds the shortest number of hops from an upstream cluster. + * Returns -1 if the cluster is unreachable. + */ public static int replicationHops(Map- * Topics may be replicated multiple hops, so the immediately upstream topic - * may itself be a remote topic. - *
- * Returns null if not a remote topic. + /** + * Return the name of the given topic on the source cluster. + *
+ * Topics may be replicated multiple hops, so the immediately upstream topic may itself be a remote topic. + *
+ * Returns null if the given topic is not a remote topic. */ String upstreamTopic(String topic); - /** The name of the original source-topic, which may have been replicated multiple hops. - * Returns the topic if it is not a remote topic. + /** + * Returns the name of the original topic, which may have been replicated multiple hops. + * Returns the topic if it is not a remote topic. */ default String originalTopic(String topic) { String upstream = upstreamTopic(topic); @@ -52,37 +58,52 @@ public interface ReplicationPolicy { } } - /** Returns heartbeats topic name.*/ + /** + * Returns the name of heartbeats topic. + */ default String heartbeatsTopic() { return "heartbeats"; } - /** Returns the offset-syncs topic for given cluster alias. */ + /** + * Returns the name of the offset-syncs topic for given cluster alias. + */ default String offsetSyncsTopic(String clusterAlias) { return "mm2-offset-syncs." + clusterAlias + ".internal"; } - /** Returns the name checkpoint topic for given cluster alias. */ + /** + * Returns the name of the checkpoints topic for given cluster alias. + */ default String checkpointsTopic(String clusterAlias) { return clusterAlias + ".checkpoints.internal"; } - /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */ + /** + * Returns true if the topic is a heartbeats topic + */ default boolean isHeartbeatsTopic(String topic) { return heartbeatsTopic().equals(originalTopic(topic)); } - /** check if topic is a checkpoint topic. */ + /** + * Returns true if the topic is a checkpoints topic. + */ default boolean isCheckpointsTopic(String topic) { return topic.endsWith(".checkpoints.internal"); } - /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/ + /** + * Returns true if the topic is one of MirrorMaker internal topics. + * This is used to make sure the topic doesn't need to be replicated. + */ default boolean isMM2InternalTopic(String topic) { return topic.endsWith(".internal"); } - /** Internal topics are never replicated. */ + /** + * Returns true if the topic is considered an internal topic. + */ default boolean isInternalTopic(String topic) { boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith("."); boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal"); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java index f9793aceed9..7bafd52244e 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.connect.mirror; -/** Directional pair of clusters, where source is replicated to target. */ +/** + * Directional pair of clusters, where source is mirrored to target. + */ public class SourceAndTarget { private final String source; private final String target; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java new file mode 100644 index 00000000000..48ed522ae34 --- /dev/null +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides APIs for the MirrorMaker connectors and utilities to manage MirrorMaker resources. + */ +package org.apache.kafka.connect.mirror; \ No newline at end of file diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java index 392c58fb01f..1c97ced61ef 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java @@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; +/** + * Task implementation for {@link MockSinkConnector}. + */ public class MockSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java index c09fa6a5319..f69c58b99ab 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java @@ -27,6 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link MockSourceConnector}. + */ public class MockSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index 7895dbbef4f..c40e0932e53 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -31,6 +31,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link SchemaSourceConnector}. + */ public class SchemaSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java index fbe29e13ed4..b198dafc535 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSinkTask */ public class VerifiableSinkConnector extends SinkConnector { diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java index 6262cc3b0bb..e23992dfe1b 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSourceTask */ public class VerifiableSourceConnector extends SourceConnector { diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java new file mode 100644 index 00000000000..7a5ef74c51a --- /dev/null +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides source and sink connector implementations used for testing + */ +package org.apache.kafka.connect.tools; \ No newline at end of file