mirror of https://github.com/apache/kafka.git
MINOR: Add retention prop to share group state topic. (#20013)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka states the `retention.ms` property for the `__share_group_state` to be `-1`. * This PR makes it explicit when defining the values of those configs. * Existing test has been updated. ``` $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic __share_group_state Topic: __share_group_state TopicId: XCwzZjEGSjm5lUc_BeCrqA PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer, min.insync.replicas=1, cleanup.policy=delete, segment.bytes=104857600, retention.ms=-1 ... ``` Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
3404f65cdb
commit
56a6ba2d2e
|
@ -253,10 +253,12 @@ public class ShareCoordinatorService implements ShareCoordinator {
|
||||||
@Override
|
@Override
|
||||||
public Properties shareGroupStateTopicConfigs() {
|
public Properties shareGroupStateTopicConfigs() {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
|
// As defined in KIP-932.
|
||||||
|
properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
|
||||||
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
|
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name);
|
||||||
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes());
|
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes());
|
||||||
properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr());
|
properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr());
|
||||||
|
properties.put(TopicConfig.RETENTION_MS_CONFIG, -1);
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2158,10 +2158,12 @@ class ShareCoordinatorServiceTest {
|
||||||
TopicConfig.CLEANUP_POLICY_CONFIG,
|
TopicConfig.CLEANUP_POLICY_CONFIG,
|
||||||
TopicConfig.COMPRESSION_TYPE_CONFIG,
|
TopicConfig.COMPRESSION_TYPE_CONFIG,
|
||||||
TopicConfig.SEGMENT_BYTES_CONFIG,
|
TopicConfig.SEGMENT_BYTES_CONFIG,
|
||||||
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
|
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
|
||||||
|
TopicConfig.RETENTION_MS_CONFIG
|
||||||
);
|
);
|
||||||
Properties actual = service.shareGroupStateTopicConfigs();
|
Properties actual = service.shareGroupStateTopicConfigs();
|
||||||
propNames.forEach(actual::contains);
|
propNames.forEach(actual::remove);
|
||||||
|
assertTrue(actual.isEmpty());
|
||||||
|
|
||||||
service.shutdown();
|
service.shutdown();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue