From 5235e11d4d54d3b06daac434aa9c8a631a73e3e3 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Mon, 17 Feb 2025 01:52:48 +0530 Subject: [PATCH] KAFKA-18809 Set min in sync replicas for __share_group_state. (#18922) - The share.coordinator.state.topic.min.isr config defined in ShareCoordinatorConfig was not being used in the AutoTopicCreationManager. - The AutoTopicCreationManager calls the ShareCoordinatorService.shareGroupStateTopicConfigs to configs for the topic to create. - The method ShareCoordinatorService.shareGroupStateTopicConfigs was not setting the supplied config value for share.coordinator.state.topic.min.isr to min.insync.replicas. - In this PR, we remedy the situation by setting the value - A test has been added to ShareCoordinatorServiceTest so that this is not repeated for any configs. Reviewers: poorv Mittal , Chia-Ping Tsai --- .../share/ShareCoordinatorService.java | 1 + .../share/ShareCoordinatorServiceTest.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 3139e2ef07d..18b7f0bf051 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -235,6 +235,7 @@ public class ShareCoordinatorService implements ShareCoordinator { properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932 properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name); properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes()); + properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr()); return properties; } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 331595119b6..8738db4dd9a 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -1643,6 +1645,36 @@ class ShareCoordinatorServiceTest { service.shutdown(); } + @Test + public void testShareStateTopicConfigs() { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(metrics), + time, + timer, + writer + )); + + List propNames = List.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.COMPRESSION_TYPE_CONFIG, + TopicConfig.SEGMENT_BYTES_CONFIG, + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG + ); + Properties actual = service.shareGroupStateTopicConfigs(); + propNames.forEach(actual::contains); + + service.shutdown(); + } + private void checkMetrics(Metrics metrics) { Set usualMetrics = new HashSet<>(Arrays.asList( metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),