KAFKA-10357: Add missing repartition topic validation (#10305)

Reviewers: Rohan Desai <rohan@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2021-03-12 17:59:41 +01:00 committed by GitHub
parent 4e60ad72fb
commit b519117b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 331 additions and 201 deletions

View File

@ -219,8 +219,7 @@ public class InternalTopicManager {
} catch (final ExecutionException executionException) { } catch (final ExecutionException executionException) {
final Throwable cause = executionException.getCause(); final Throwable cause = executionException.getCause();
if (cause instanceof UnknownTopicOrPartitionException) { if (cause instanceof UnknownTopicOrPartitionException) {
log.info("Internal topic {} is missing", log.info("Internal topic {} is missing", topicName);
topicName);
validationResult.addMissingTopic(topicName); validationResult.addMissingTopic(topicName);
topicsStillToValidate.remove(topicName); topicsStillToValidate.remove(topicName);
} else if (cause instanceof LeaderNotAvailableException) { } else if (cause instanceof LeaderNotAvailableException) {
@ -296,6 +295,10 @@ public class InternalTopicManager {
validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig); validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
} else if (topicConfig instanceof WindowedChangelogTopicConfig) { } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig); validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
} else if (topicConfig instanceof RepartitionTopicConfig) {
validateCleanupPolicyForRepartitionTopic(validationResult, topicConfig, brokerSideTopicConfig);
} else {
throw new IllegalStateException("Internal topic " + topicConfig.name() + " has unknown type.");
} }
} }
@ -307,7 +310,8 @@ public class InternalTopicManager {
if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) { if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
validationResult.addMisconfiguration( validationResult.addMisconfiguration(
topicName, topicName,
"Cleanup policy of existing internal topic " + topicName + " should not contain \"" "Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "
+ topicName + " should not contain \""
+ TopicConfig.CLEANUP_POLICY_DELETE + "\"." + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
); );
} }
@ -327,8 +331,8 @@ public class InternalTopicManager {
if (brokerSideRetentionMs < streamsSideRetentionMs) { if (brokerSideRetentionMs < streamsSideRetentionMs) {
validationResult.addMisconfiguration( validationResult.addMisconfiguration(
topicName, topicName,
"Retention time of existing internal topic " + topicName + " is " + brokerSideRetentionMs + "Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic "
" but should be " + streamsSideRetentionMs + " or larger." + topicName + " is " + brokerSideRetentionMs + " but should be " + streamsSideRetentionMs + " or larger."
); );
} }
final String brokerSideRetentionBytes = final String brokerSideRetentionBytes =
@ -336,7 +340,41 @@ public class InternalTopicManager {
if (brokerSideRetentionBytes != null) { if (brokerSideRetentionBytes != null) {
validationResult.addMisconfiguration( validationResult.addMisconfiguration(
topicName, topicName,
"Retention byte of existing internal topic " + topicName + " is set but it should be unset." "Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic "
+ topicName + " is set but it should be unset."
);
}
}
}
private void validateCleanupPolicyForRepartitionTopic(final ValidationResult validationResult,
final InternalTopicConfig topicConfig,
final Config brokerSideTopicConfig) {
final String topicName = topicConfig.name();
final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
validationResult.addMisconfiguration(
topicName,
"Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "
+ topicName + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\"."
);
} else if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
final long brokerSideRetentionMs =
Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
if (brokerSideRetentionMs != -1) {
validationResult.addMisconfiguration(
topicName,
"Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic "
+ topicName + " is " + brokerSideRetentionMs + " but should be -1."
);
}
final String brokerSideRetentionBytes =
getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_BYTES_CONFIG, topicName);
if (brokerSideRetentionBytes != null) {
validationResult.addMisconfiguration(
topicName,
"Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic "
+ topicName + " is set but it should be unset."
); );
} }
} }

View File

@ -56,6 +56,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -468,16 +469,15 @@ public class InternalTopicManagerTest {
@Test @Test
public void shouldValidateSuccessfully() { public void shouldValidateSuccessfully() {
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, repartitionTopicConfig());
false, setupTopicInMockAdminClient(topic2, repartitionTopicConfig());
topic1, final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);
null
);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(1);
final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)); final ValidationResult validationResult = internalTopicManager.validate(mkMap(
mkEntry(topic1, internalTopicConfig1),
mkEntry(topic2, internalTopicConfig2)
));
assertThat(validationResult.missingTopics(), empty()); assertThat(validationResult.missingTopics(), empty());
assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap()); assertThat(validationResult.misconfigurationsForTopics(), anEmptyMap());
@ -485,12 +485,7 @@ public class InternalTopicManagerTest {
@Test @Test
public void shouldValidateSuccessfullyWithEmptyInternalTopics() { public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, repartitionTopicConfig());
false,
topic1,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null
);
final ValidationResult validationResult = internalTopicManager.validate(Collections.emptyMap()); final ValidationResult validationResult = internalTopicManager.validate(Collections.emptyMap());
@ -502,18 +497,10 @@ public class InternalTopicManagerTest {
public void shouldReportMissingTopics() { public void shouldReportMissingTopics() {
final String missingTopic1 = "missingTopic1"; final String missingTopic1 = "missingTopic1";
final String missingTopic2 = "missingTopic2"; final String missingTopic2 = "missingTopic2";
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, repartitionTopicConfig());
false, final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);
topic1, final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(missingTopic1, 1);
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(missingTopic2, 1);
null
);
final InternalTopicConfig internalTopicConfig1 = new RepartitionTopicConfig(topic1, Collections.emptyMap());
internalTopicConfig1.setNumberOfPartitions(1);
final InternalTopicConfig internalTopicConfig2 = new RepartitionTopicConfig(missingTopic1, Collections.emptyMap());
internalTopicConfig2.setNumberOfPartitions(1);
final InternalTopicConfig internalTopicConfig3 = new RepartitionTopicConfig(missingTopic2, Collections.emptyMap());
internalTopicConfig3.setNumberOfPartitions(1);
final ValidationResult validationResult = internalTopicManager.validate(mkMap( final ValidationResult validationResult = internalTopicManager.validate(mkMap(
mkEntry(topic1, internalTopicConfig1), mkEntry(topic1, internalTopicConfig1),
@ -530,30 +517,12 @@ public class InternalTopicManagerTest {
@Test @Test
public void shouldReportMisconfigurationsOfPartitionCount() { public void shouldReportMisconfigurationsOfPartitionCount() {
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, repartitionTopicConfig());
false, setupTopicInMockAdminClient(topic2, repartitionTopicConfig());
topic1, setupTopicInMockAdminClient(topic3, repartitionTopicConfig());
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 2);
null final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 3);
); final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(topic3, 1);
mockAdminClient.addTopic(
false,
topic2,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null
);
mockAdminClient.addTopic(
false,
topic3,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null
);
final InternalTopicConfig internalTopicConfig1 = new RepartitionTopicConfig(topic1, Collections.emptyMap());
internalTopicConfig1.setNumberOfPartitions(2);
final InternalTopicConfig internalTopicConfig2 = new RepartitionTopicConfig(topic2, Collections.emptyMap());
internalTopicConfig2.setNumberOfPartitions(3);
final InternalTopicConfig internalTopicConfig3 = new RepartitionTopicConfig(topic3, Collections.emptyMap());
internalTopicConfig3.setNumberOfPartitions(1);
final ValidationResult validationResult = internalTopicManager.validate(mkMap( final ValidationResult validationResult = internalTopicManager.validate(mkMap(
mkEntry(topic1, internalTopicConfig1), mkEntry(topic1, internalTopicConfig1),
@ -581,27 +550,19 @@ public class InternalTopicManagerTest {
@Test @Test
public void shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedChangelogTopics() { public void shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedChangelogTopics() {
mockAdminClient.addTopic( final Map<String, String> unwindowedChangelogConfigWithDeleteCleanupPolicy = unwindowedChangelogConfig();
false, unwindowedChangelogConfigWithDeleteCleanupPolicy.put(
topic1, TopicConfig.CLEANUP_POLICY_CONFIG,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), TopicConfig.CLEANUP_POLICY_DELETE
mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE))
); );
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, unwindowedChangelogConfigWithDeleteCleanupPolicy);
false, final Map<String, String> unwindowedChangelogConfigWithDeleteCompactCleanupPolicy = unwindowedChangelogConfig();
topic2, unwindowedChangelogConfigWithDeleteCompactCleanupPolicy.put(
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(mkEntry(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE
))
);
mockAdminClient.addTopic(
false,
topic3,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT))
); );
setupTopicInMockAdminClient(topic2, unwindowedChangelogConfigWithDeleteCompactCleanupPolicy);
setupTopicInMockAdminClient(topic3, unwindowedChangelogConfig());
final InternalTopicConfig internalTopicConfig1 = setupUnwindowedChangelogTopicConfig(topic1, 1); final InternalTopicConfig internalTopicConfig1 = setupUnwindowedChangelogTopicConfig(topic1, 1);
final InternalTopicConfig internalTopicConfig2 = setupUnwindowedChangelogTopicConfig(topic2, 1); final InternalTopicConfig internalTopicConfig2 = setupUnwindowedChangelogTopicConfig(topic2, 1);
final InternalTopicConfig internalTopicConfig3 = setupUnwindowedChangelogTopicConfig(topic3, 1); final InternalTopicConfig internalTopicConfig3 = setupUnwindowedChangelogTopicConfig(topic3, 1);
@ -619,81 +580,34 @@ public class InternalTopicManagerTest {
assertThat(misconfigurationsForTopics.get(topic1).size(), is(1)); assertThat(misconfigurationsForTopics.get(topic1).size(), is(1));
assertThat( assertThat(
misconfigurationsForTopics.get(topic1).get(0), misconfigurationsForTopics.get(topic1).get(0),
is("Cleanup policy of existing internal topic " + topic1 + " should not contain \"" is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic " + topic1 + " should not contain \""
+ TopicConfig.CLEANUP_POLICY_DELETE + "\".") + TopicConfig.CLEANUP_POLICY_DELETE + "\".")
); );
assertThat(misconfigurationsForTopics, hasKey(topic2)); assertThat(misconfigurationsForTopics, hasKey(topic2));
assertThat(misconfigurationsForTopics.get(topic2).size(), is(1)); assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));
assertThat( assertThat(
misconfigurationsForTopics.get(topic2).get(0), misconfigurationsForTopics.get(topic2).get(0),
is("Cleanup policy of existing internal topic " + topic2 + " should not contain \"" is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic " + topic2 + " should not contain \""
+ TopicConfig.CLEANUP_POLICY_DELETE + "\".") + TopicConfig.CLEANUP_POLICY_DELETE + "\".")
); );
assertThat(misconfigurationsForTopics, not(hasKey(topic3))); assertThat(misconfigurationsForTopics, not(hasKey(topic3)));
} }
private InternalTopicConfig setupUnwindowedChangelogTopicConfig(final String topicName,
final int partitionCount) {
final InternalTopicConfig internalTopicConfig =
new UnwindowedChangelogTopicConfig(topicName, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(partitionCount);
return internalTopicConfig;
}
@Test @Test
public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopics() { public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopics() {
final long retentionMs = 1000; final long retentionMs = 1000;
final long shorterRetentionMs = 900; final long shorterRetentionMs = 900;
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic1, windowedChangelogConfig(retentionMs));
false, setupTopicInMockAdminClient(topic2, windowedChangelogConfig(shorterRetentionMs));
topic1, final Map<String, String> windowedChangelogConfigOnlyCleanupPolicyCompact = windowedChangelogConfig(retentionMs);
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), windowedChangelogConfigOnlyCleanupPolicyCompact.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
mkMap( setupTopicInMockAdminClient(topic3, windowedChangelogConfigOnlyCleanupPolicyCompact);
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE), final Map<String, String> windowedChangelogConfigOnlyCleanupPolicyDelete = windowedChangelogConfig(shorterRetentionMs);
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)), windowedChangelogConfigOnlyCleanupPolicyDelete.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null) setupTopicInMockAdminClient(topic4, windowedChangelogConfigOnlyCleanupPolicyDelete);
) final Map<String, String> windowedChangelogConfigWithRetentionBytes = windowedChangelogConfig(retentionMs);
); windowedChangelogConfigWithRetentionBytes.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024");
mockAdminClient.addTopic( setupTopicInMockAdminClient(topic5, windowedChangelogConfigWithRetentionBytes);
false,
topic2,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(shorterRetentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic3,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(shorterRetentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic4,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(shorterRetentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic5,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
)
);
final InternalTopicConfig internalTopicConfig1 = setupWindowedChangelogTopicConfig(topic1, 1, retentionMs); final InternalTopicConfig internalTopicConfig1 = setupWindowedChangelogTopicConfig(topic1, 1, retentionMs);
final InternalTopicConfig internalTopicConfig2 = setupWindowedChangelogTopicConfig(topic2, 1, retentionMs); final InternalTopicConfig internalTopicConfig2 = setupWindowedChangelogTopicConfig(topic2, 1, retentionMs);
final InternalTopicConfig internalTopicConfig3 = setupWindowedChangelogTopicConfig(topic3, 1, retentionMs); final InternalTopicConfig internalTopicConfig3 = setupWindowedChangelogTopicConfig(topic3, 1, retentionMs);
@ -715,26 +629,127 @@ public class InternalTopicManagerTest {
assertThat(misconfigurationsForTopics.get(topic2).size(), is(1)); assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));
assertThat( assertThat(
misconfigurationsForTopics.get(topic2).get(0), misconfigurationsForTopics.get(topic2).get(0),
is("Retention time of existing internal topic " + topic2 + " is " + shorterRetentionMs + is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +
" but should be " + retentionMs + " or larger.") topic2 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")
); );
assertThat(misconfigurationsForTopics, hasKey(topic4)); assertThat(misconfigurationsForTopics, hasKey(topic4));
assertThat(misconfigurationsForTopics.get(topic4).size(), is(1)); assertThat(misconfigurationsForTopics.get(topic4).size(), is(1));
assertThat( assertThat(
misconfigurationsForTopics.get(topic4).get(0), misconfigurationsForTopics.get(topic4).get(0),
is("Retention time of existing internal topic " + topic4 + " is " + shorterRetentionMs + is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +
" but should be " + retentionMs + " or larger.") topic4 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")
); );
assertThat(misconfigurationsForTopics, hasKey(topic5)); assertThat(misconfigurationsForTopics, hasKey(topic5));
assertThat(misconfigurationsForTopics.get(topic5).size(), is(1)); assertThat(misconfigurationsForTopics.get(topic5).size(), is(1));
assertThat( assertThat(
misconfigurationsForTopics.get(topic5).get(0), misconfigurationsForTopics.get(topic5).get(0),
is("Retention byte of existing internal topic " + topic5 + " is set but it should be unset.") is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic " +
topic5 + " is set but it should be unset.")
); );
assertThat(misconfigurationsForTopics, not(hasKey(topic1))); assertThat(misconfigurationsForTopics, not(hasKey(topic1)));
assertThat(misconfigurationsForTopics, not(hasKey(topic3))); assertThat(misconfigurationsForTopics, not(hasKey(topic3)));
} }
@Test
public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() {
final long retentionMs = 1000;
mockAdminClient.addTopic(
false,
topic1,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic2,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic3,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic4,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
);
mockAdminClient.addTopic(
false,
topic5,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
)
);
final InternalTopicConfig internalTopicConfig1 = setupRepartitionTopicConfig(topic1, 1);
final InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig(topic2, 1);
final InternalTopicConfig internalTopicConfig3 = setupRepartitionTopicConfig(topic3, 1);
final InternalTopicConfig internalTopicConfig4 = setupRepartitionTopicConfig(topic4, 1);
final InternalTopicConfig internalTopicConfig5 = setupRepartitionTopicConfig(topic5, 1);
final ValidationResult validationResult = internalTopicManager.validate(mkMap(
mkEntry(topic1, internalTopicConfig1),
mkEntry(topic2, internalTopicConfig2),
mkEntry(topic3, internalTopicConfig3),
mkEntry(topic4, internalTopicConfig4),
mkEntry(topic5, internalTopicConfig5)
));
final Map<String, List<String>> misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
assertThat(validationResult.missingTopics(), empty());
assertThat(misconfigurationsForTopics.size(), is(4));
assertThat(misconfigurationsForTopics, hasKey(topic2));
assertThat(misconfigurationsForTopics.get(topic2).size(), is(1));
assertThat(
misconfigurationsForTopics.get(topic2).get(0),
is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "
+ topic2 + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\".")
);
assertThat(misconfigurationsForTopics, hasKey(topic3));
assertThat(misconfigurationsForTopics.get(topic3).size(), is(1));
assertThat(
misconfigurationsForTopics.get(topic3).get(0),
is("Cleanup policy (" + TopicConfig.CLEANUP_POLICY_CONFIG + ") of existing internal topic "
+ topic3 + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\".")
);
assertThat(misconfigurationsForTopics, hasKey(topic4));
assertThat(misconfigurationsForTopics.get(topic4).size(), is(1));
assertThat(
misconfigurationsForTopics.get(topic4).get(0),
is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic "
+ topic4 + " is " + retentionMs + " but should be -1.")
);
assertThat(misconfigurationsForTopics, hasKey(topic5));
assertThat(misconfigurationsForTopics.get(topic5).size(), is(1));
assertThat(
misconfigurationsForTopics.get(topic5).get(0),
is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic "
+ topic5 + " is set but it should be unset.")
);
}
@Test @Test
public void shouldReportMultipleMisconfigurationsForSameTopic() { public void shouldReportMultipleMisconfigurationsForSameTopic() {
final long retentionMs = 1000; final long retentionMs = 1000;
@ -762,33 +777,23 @@ public class InternalTopicManagerTest {
assertThat(misconfigurationsForTopics.get(topic1).size(), is(2)); assertThat(misconfigurationsForTopics.get(topic1).size(), is(2));
assertThat( assertThat(
misconfigurationsForTopics.get(topic1).get(0), misconfigurationsForTopics.get(topic1).get(0),
is("Retention time of existing internal topic " + topic1 + " is " + shorterRetentionMs + is("Retention time (" + TopicConfig.RETENTION_MS_CONFIG + ") of existing internal topic " +
" but should be " + retentionMs + " or larger.") topic1 + " is " + shorterRetentionMs + " but should be " + retentionMs + " or larger.")
); );
assertThat( assertThat(
misconfigurationsForTopics.get(topic1).get(1), misconfigurationsForTopics.get(topic1).get(1),
is("Retention byte of existing internal topic " + topic1 + " is set but it should be unset.") is("Retention byte (" + TopicConfig.RETENTION_BYTES_CONFIG + ") of existing internal topic " +
topic1 + " is set but it should be unset.")
); );
} }
private InternalTopicConfig setupWindowedChangelogTopicConfig(final String topicName,
final int partitionCount,
final long retentionMs) {
final InternalTopicConfig internalTopicConfig = new WindowedChangelogTopicConfig(
topicName,
mkMap(mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)))
);
internalTopicConfig.setNumberOfPartitions(partitionCount);
return internalTopicConfig;
}
@Test @Test
public void shouldThrowWhenPartitionCountUnknown() { public void shouldThrowWhenPartitionCountUnknown() {
mockAdminClient.addTopic( mockAdminClient.addTopic(
false, false,
topic1, topic1,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE))
); );
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());
@ -804,11 +809,14 @@ public class InternalTopicManagerTest {
false, false,
topic1, topic1,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
)
); );
mockAdminClient.timeoutNextRequest(2); mockAdminClient.timeoutNextRequest(2);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)); final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));
@ -836,13 +844,15 @@ public class InternalTopicManagerTest {
.andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture)))) .andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))))
.andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture)))); .andReturn(new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionSuccessfulFuture))));
final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();
topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet())); topicConfigSuccessfulFuture.complete(
new Config(repartitionTopicConfig().entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))
);
final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1); final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture)))); .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)); final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));
@ -870,14 +880,16 @@ public class InternalTopicManagerTest {
final KafkaFutureImpl<Config> topicConfigsFailFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Config> topicConfigsFailFuture = new KafkaFutureImpl<>();
topicConfigsFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); topicConfigsFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();
topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet())); topicConfigSuccessfulFuture.complete(
new Config(repartitionTopicConfig().entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))
);
final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1); final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigsFailFuture)))) .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigsFailFuture))))
.andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture)))); .andReturn(new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)); final ValidationResult validationResult = topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));
@ -899,8 +911,7 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeTopics(Collections.singleton(topic1))) EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))
.andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture)))); .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))); assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)));
} }
@ -919,8 +930,7 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, configDescriptionFailFuture)))); .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, configDescriptionFailFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig))); assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap(topic1, internalTopicConfig)));
} }
@ -947,8 +957,7 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture)))); .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows( assertThrows(
IllegalStateException.class, IllegalStateException.class,
@ -979,8 +988,7 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource1))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource1)))
.andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource2, topicConfigSuccessfulFuture)))); .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource2, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows( assertThrows(
IllegalStateException.class, IllegalStateException.class,
@ -992,42 +1000,65 @@ public class InternalTopicManagerTest {
public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForUnwindowedConfigDuringValidation() { public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForUnwindowedConfigDuringValidation() {
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation( shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupUnwindowedChangelogTopicConfig(topic1, 1), setupUnwindowedChangelogTopicConfig(topic1, 1),
new Config(Collections.emptySet()) configWithoutKey(unwindowedChangelogConfig(), TopicConfig.CLEANUP_POLICY_CONFIG)
); );
} }
@Test @Test
public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForWindowedConfigDuringValidation() { public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForWindowedConfigDuringValidation() {
final long retentionMs = 1000; final long retentionMs = 1000;
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation( shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupWindowedChangelogTopicConfig(topic1, 1, retentionMs), setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),
new Config(mkSet( configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.CLEANUP_POLICY_CONFIG)
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)),
new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "1000"))
)
); );
} }
@Test @Test
public void shouldThrowWhenConfigDescriptionsDoNotRetentionMsForWindowedConfigDuringValidation() { public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForWindowedConfigDuringValidation() {
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupWindowedChangelogTopicConfig(topic1, 1, 1000),
new Config(mkSet(
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "1000"))
)
);
}
@Test
public void shouldThrowWhenConfigDescriptionsDoNotRetentionBytesForWindowedConfigDuringValidation() {
final long retentionMs = 1000; final long retentionMs = 1000;
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation( shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupWindowedChangelogTopicConfig(topic1, 1, retentionMs), setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),
new Config(mkSet( configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.RETENTION_MS_CONFIG)
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), );
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs))) }
)
@Test
public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForWindowedConfigDuringValidation() {
final long retentionMs = 1000;
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupWindowedChangelogTopicConfig(topic1, 1, retentionMs),
configWithoutKey(windowedChangelogConfig(retentionMs), TopicConfig.RETENTION_BYTES_CONFIG)
);
}
@Test
public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForRepartitionConfigDuringValidation() {
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupRepartitionTopicConfig(topic1, 1),
configWithoutKey(repartitionTopicConfig(), TopicConfig.CLEANUP_POLICY_CONFIG)
);
}
@Test
public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForRepartitionConfigDuringValidation() {
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupRepartitionTopicConfig(topic1, 1),
configWithoutKey(repartitionTopicConfig(), TopicConfig.RETENTION_MS_CONFIG)
);
}
@Test
public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForRepartitionConfigDuringValidation() {
shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(
setupRepartitionTopicConfig(topic1, 1),
configWithoutKey(repartitionTopicConfig(), TopicConfig.RETENTION_BYTES_CONFIG)
);
}
private Config configWithoutKey(final Map<String, String> config, final String key) {
return new Config(config.entrySet().stream()
.filter(entry -> !entry.getKey().equals(key))
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet())
); );
} }
@ -1076,13 +1107,15 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeTopics(Collections.singleton(topic1))) EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))
.andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture)))); .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFailFuture))));
final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();
topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet())); topicConfigSuccessfulFuture.complete(
new Config(repartitionTopicConfig().entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))
);
final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1); final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture)))); .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows( assertThrows(
TimeoutException.class, TimeoutException.class,
@ -1105,13 +1138,15 @@ public class InternalTopicManagerTest {
EasyMock.expect(admin.describeTopics(Collections.singleton(topic1))) EasyMock.expect(admin.describeTopics(Collections.singleton(topic1)))
.andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFutureThatNeverCompletes)))); .andStubAnswer(() -> new MockDescribeTopicsResult(mkMap(mkEntry(topic1, topicDescriptionFutureThatNeverCompletes))));
final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Config> topicConfigSuccessfulFuture = new KafkaFutureImpl<>();
topicConfigSuccessfulFuture.complete(new Config(Collections.emptySet())); topicConfigSuccessfulFuture.complete(
new Config(repartitionTopicConfig().entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())).collect(Collectors.toSet()))
);
final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1); final ConfigResource topicResource = new ConfigResource(Type.TOPIC, topic1);
EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource))) EasyMock.expect(admin.describeConfigs(Collections.singleton(topicResource)))
.andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture)))); .andStubAnswer(() -> new MockDescribeConfigsResult(mkMap(mkEntry(topicResource, topicConfigSuccessfulFuture))));
EasyMock.replay(admin); EasyMock.replay(admin);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); final InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig(topic1, 1);
internalTopicConfig.setNumberOfPartitions(1);
assertThrows( assertThrows(
TimeoutException.class, TimeoutException.class,
@ -1119,6 +1154,63 @@ public class InternalTopicManagerTest {
); );
} }
private Map<String, String> repartitionTopicConfig() {
return mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
);
}
private Map<String, String> unwindowedChangelogConfig() {
return mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
);
}
private Map<String, String> windowedChangelogConfig(final long retentionMs) {
return mkMap(
mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE),
mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)),
mkEntry(TopicConfig.RETENTION_BYTES_CONFIG, null)
);
}
private void setupTopicInMockAdminClient(final String topic, final Map<String, String> topicConfig) {
mockAdminClient.addTopic(
false,
topic,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
topicConfig
);
}
private InternalTopicConfig setupUnwindowedChangelogTopicConfig(final String topicName,
final int partitionCount) {
final InternalTopicConfig internalTopicConfig =
new UnwindowedChangelogTopicConfig(topicName, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(partitionCount);
return internalTopicConfig;
}
private InternalTopicConfig setupWindowedChangelogTopicConfig(final String topicName,
final int partitionCount,
final long retentionMs) {
final InternalTopicConfig internalTopicConfig = new WindowedChangelogTopicConfig(
topicName,
mkMap(mkEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)))
);
internalTopicConfig.setNumberOfPartitions(partitionCount);
return internalTopicConfig;
}
private InternalTopicConfig setupRepartitionTopicConfig(final String topicName,
final int partitionCount) {
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicName, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(partitionCount);
return internalTopicConfig;
}
private static class MockCreateTopicsResult extends CreateTopicsResult { private static class MockCreateTopicsResult extends CreateTopicsResult {
MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) { MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {