diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 3139e2ef07d..18b7f0bf051 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -235,6 +235,7 @@ public class ShareCoordinatorService implements ShareCoordinator { properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932 properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name); properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes()); + properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr()); return properties; } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 331595119b6..8738db4dd9a 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -1643,6 +1645,36 @@ class ShareCoordinatorServiceTest { service.shutdown(); } + @Test + public void testShareStateTopicConfigs() { + CoordinatorRuntime runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(metrics), + time, + timer, + writer + )); + + List propNames = List.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.COMPRESSION_TYPE_CONFIG, + TopicConfig.SEGMENT_BYTES_CONFIG, + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG + ); + Properties actual = service.shareGroupStateTopicConfigs(); + propNames.forEach(actual::contains); + + service.shutdown(); + } + private void checkMetrics(Metrics metrics) { Set usualMetrics = new HashSet<>(Arrays.asList( metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),