KAFKA-17200: Allow the replication of user internal topics (#17815)

Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
This commit is contained in:
Patrik Marton 2024-12-06 15:23:58 +01:00 committed by GitHub
parent ccca9f146e
commit 0bbed823e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 14 additions and 8 deletions

View File

@ -115,6 +115,6 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
@Override
public boolean isMM2InternalTopic(String topic) {
return topic.endsWith(internalSuffix());
return topic.startsWith("mm2") && topic.endsWith(internalSuffix()) || isCheckpointsTopic(topic);
}
}

View File

@ -98,7 +98,7 @@ public interface ReplicationPolicy {
* This is used to make sure the topic doesn't need to be replicated.
*/
default boolean isMM2InternalTopic(String topic) {
return topic.endsWith(".internal");
return topic.startsWith("mm2") && topic.endsWith(".internal") || isCheckpointsTopic(topic);
}
/**
@ -106,7 +106,6 @@ public interface ReplicationPolicy {
*/
default boolean isInternalTopic(String topic) {
boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".");
boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal");
return isMM2InternalTopic(topic) || isKafkaInternalTopic || isDefaultConnectTopic;
return isMM2InternalTopic(topic) || isKafkaInternalTopic;
}
}

View File

@ -38,15 +38,17 @@ public class ReplicationPolicyTest {
@Test
public void testInternalTopic() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, ".");
DEFAULT_REPLICATION_POLICY.configure(config);
// starts with '__'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
// starts with '.'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
// ends with '.internal': default DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
// starts with 'mm2' and ends with '.internal': default DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
// ends with '-internal'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
// non-internal topic.
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
}

View File

@ -33,7 +33,7 @@ public class DefaultTopicFilter implements TopicFilter {
public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";
public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
public static final String TOPICS_EXCLUDE_DEFAULT = "mm2.*\\.internal, .*\\.replica, __.*";
private Pattern includePattern;
private Pattern excludePattern;

View File

@ -204,6 +204,11 @@
index. This API is used when the consumers are enabled with isolation level as READ_COMMITTED.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions">KIP-1058</a> for more details.
</li>
<li>
The criteria for identifying internal topics in ReplicationPolicy and DefaultReplicationPolicy have
been updated to enable the replication of topics that appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics">KIP-1074</a> for more details.
</li>
</ul>
</li>
</ul>