mirror of https://github.com/apache/kafka.git
KAFKA-18809 Set min in sync replicas for __share_group_state. (#18922)
- The share.coordinator.state.topic.min.isr config defined in ShareCoordinatorConfig was not being used in the AutoTopicCreationManager. - The AutoTopicCreationManager calls the ShareCoordinatorService.shareGroupStateTopicConfigs to configs for the topic to create. - The method ShareCoordinatorService.shareGroupStateTopicConfigs was not setting the supplied config value for share.coordinator.state.topic.min.isr to min.insync.replicas. - In this PR, we remedy the situation by setting the value - A test has been added to ShareCoordinatorServiceTest so that this is not repeated for any configs. Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d0e516a872
commit
5235e11d4d
|
@ -235,6 +235,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
|
||||||
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
|
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
|
||||||
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
|
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
|
||||||
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes());
|
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes());
|
||||||
|
properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr());
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||||
import org.apache.kafka.common.internals.Topic;
|
import org.apache.kafka.common.internals.Topic;
|
||||||
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
|
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
|
||||||
|
@ -55,6 +56,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -1643,6 +1645,36 @@ class ShareCoordinatorServiceTest {
|
||||||
service.shutdown();
|
service.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShareStateTopicConfigs() {
|
||||||
|
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
|
||||||
|
MockTime time = new MockTime();
|
||||||
|
MockTimer timer = new MockTimer(time);
|
||||||
|
PartitionWriter writer = mock(PartitionWriter.class);
|
||||||
|
|
||||||
|
Metrics metrics = new Metrics();
|
||||||
|
ShareCoordinatorService service = spy(new ShareCoordinatorService(
|
||||||
|
new LogContext(),
|
||||||
|
ShareCoordinatorTestConfig.testConfig(),
|
||||||
|
runtime,
|
||||||
|
new ShareCoordinatorMetrics(metrics),
|
||||||
|
time,
|
||||||
|
timer,
|
||||||
|
writer
|
||||||
|
));
|
||||||
|
|
||||||
|
List<String> propNames = List.of(
|
||||||
|
TopicConfig.CLEANUP_POLICY_CONFIG,
|
||||||
|
TopicConfig.COMPRESSION_TYPE_CONFIG,
|
||||||
|
TopicConfig.SEGMENT_BYTES_CONFIG,
|
||||||
|
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
|
||||||
|
);
|
||||||
|
Properties actual = service.shareGroupStateTopicConfigs();
|
||||||
|
propNames.forEach(actual::contains);
|
||||||
|
|
||||||
|
service.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
private void checkMetrics(Metrics metrics) {
|
private void checkMetrics(Metrics metrics) {
|
||||||
Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
|
Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
|
||||||
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
|
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
|
||||||
|
|
Loading…
Reference in New Issue