KAFKA-13397: MirrorMaker should not mirror topics ending with `.internal` (#11431)

When running in dedicated mode, Connect runtimes are configured to use the `.internal` suffix for their topics. 

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>
This commit is contained in:
Lee Dongjin 2021-11-18 02:14:02 +09:00 committed by GitHub
parent 1b4cffdcb7
commit ef94af1e8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 2 deletions

View File

@ -84,7 +84,8 @@ public interface ReplicationPolicy {
/** Internal topics are never replicated. */ /** Internal topics are never replicated. */
default boolean isInternalTopic(String topic) { default boolean isInternalTopic(String topic) {
boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") || topic.endsWith("-internal"); boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".");
return isMM2InternalTopic(topic) || isKafkaInternalTopic; boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal");
return isMM2InternalTopic(topic) || isKafkaInternalTopic || isDefaultConnectTopic;
} }
} }

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class ReplicationPolicyTest {
private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY = new DefaultReplicationPolicy();
@Test
public void testInternalTopic() {
// starts with '__'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
// starts with '.'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
// ends with '.internal': default DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
// ends with '-internal'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
// non-internal topic.
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
}
}

View File

@ -236,6 +236,7 @@ public class MirrorMakerConfigTest {
SourceAndTarget b = new SourceAndTarget("a", "b"); SourceAndTarget b = new SourceAndTarget("a", "b");
Map<String, String> aProps = mirrorConfig.workerConfig(a); Map<String, String> aProps = mirrorConfig.workerConfig(a);
assertEquals("123", aProps.get("offset.storage.replication.factor")); assertEquals("123", aProps.get("offset.storage.replication.factor"));
assertEquals("__", aProps.get("replication.policy.separator"));
Map<String, String> bProps = mirrorConfig.workerConfig(b); Map<String, String> bProps = mirrorConfig.workerConfig(b);
assertEquals("456", bProps.get("status.storage.replication.factor")); assertEquals("456", bProps.get("status.storage.replication.factor"));
assertEquals("client-one", bProps.get("producer.client.id"), assertEquals("client-one", bProps.get("producer.client.id"),
@ -254,6 +255,7 @@ public class MirrorMakerConfigTest {
"security properties should be transformed in worker config"); "security properties should be transformed in worker config");
assertEquals("secret2", bProps.get("producer.ssl.key.password"), assertEquals("secret2", bProps.get("producer.ssl.key.password"),
"security properties should be transformed in worker producer config"); "security properties should be transformed in worker producer config");
assertEquals("__", bProps.get("replication.policy.separator"));
} }
@Test @Test