mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-17292: Introduce share coordinator protocol config (#16847)
Add the "share" group coordinator rebalance protocol as the way to enable KIP-932. It is also necessary to turn on the new group coordinator. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
		
							parent
							
								
									fac8cae388
								
							
						
					
					
						commit
						deab703e43
					
				|  | @ -579,6 +579,14 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) | ||||||
|       warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is enabled along with the new group coordinator. " + |       warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is enabled along with the new group coordinator. " + | ||||||
|         "This is part of the preview of KIP-848 and MUST NOT be used in production.") |         "This is part of the preview of KIP-848 and MUST NOT be used in production.") | ||||||
|     } |     } | ||||||
|  |     if (protocols.contains(GroupType.SHARE)) { | ||||||
|  |       // The CONSUMER protocol enables the new group coordinator, and that's a prerequisite for share groups. | ||||||
|  |       if (!protocols.contains(GroupType.CONSUMER)) { | ||||||
|  |         throw new ConfigException(s"Enabling the new '${GroupType.SHARE}' rebalance protocol requires '${GroupType.CONSUMER}' to be enabled also.") | ||||||
|  |       } | ||||||
|  |       warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " + | ||||||
|  |         "This is part of the early access of KIP-932 and MUST NOT be used in production.") | ||||||
|  |     } | ||||||
|     protocols |     protocols | ||||||
|   } |   } | ||||||
|   // The new group coordinator is enabled in two cases: 1) The internal configuration to enable |   // The new group coordinator is enabled in two cases: 1) The internal configuration to enable | ||||||
|  |  | ||||||
|  | @ -1941,7 +1941,7 @@ class KafkaConfigTest { | ||||||
|     // Setting KRaft's properties. |     // Setting KRaft's properties. | ||||||
|     props.putAll(kraftProps()) |     props.putAll(kraftProps()) | ||||||
| 
 | 
 | ||||||
|     // Only classic and consumer are supported. |     // Only classic, consumer and share are supported. | ||||||
|     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "foo") |     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "foo") | ||||||
|     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) |     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) | ||||||
| 
 | 
 | ||||||
|  | @ -1951,9 +1951,21 @@ class KafkaConfigTest { | ||||||
| 
 | 
 | ||||||
|     // This is OK. |     // This is OK. | ||||||
|     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") |     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") | ||||||
|     val config = KafkaConfig.fromProps(props) |     var config = KafkaConfig.fromProps(props) | ||||||
|     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER), config.groupCoordinatorRebalanceProtocols) |     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER), config.groupCoordinatorRebalanceProtocols) | ||||||
|     assertTrue(config.isNewGroupCoordinatorEnabled) |     assertTrue(config.isNewGroupCoordinatorEnabled) | ||||||
|  |     assertFalse(config.shareGroupConfig.isShareGroupEnabled) | ||||||
|  | 
 | ||||||
|  |     // This is OK. | ||||||
|  |     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share") | ||||||
|  |     config = KafkaConfig.fromProps(props) | ||||||
|  |     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) | ||||||
|  |     assertTrue(config.isNewGroupCoordinatorEnabled) | ||||||
|  |     assertTrue(config.shareGroupConfig.isShareGroupEnabled) | ||||||
|  | 
 | ||||||
|  |     // If you set share, you must also set consumer. | ||||||
|  |     props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share") | ||||||
|  |     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @Test | ||||||
|  |  | ||||||
|  | @ -76,7 +76,8 @@ public class GroupCoordinatorConfig { | ||||||
|     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; |     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; | ||||||
|     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + |     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + | ||||||
|             Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + |             Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + | ||||||
|             "The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; |             "The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " + | ||||||
|  |             "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; | ||||||
|     public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); |     public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); | ||||||
|     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; |     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; | ||||||
|     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + |     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + | ||||||
|  |  | ||||||
|  | @ -16,9 +16,14 @@ | ||||||
|  */ |  */ | ||||||
| package org.apache.kafka.server.config; | package org.apache.kafka.server.config; | ||||||
| 
 | 
 | ||||||
|  | import org.apache.kafka.common.GroupType; | ||||||
| import org.apache.kafka.common.config.AbstractConfig; | import org.apache.kafka.common.config.AbstractConfig; | ||||||
| import org.apache.kafka.common.config.ConfigDef; | import org.apache.kafka.common.config.ConfigDef; | ||||||
| import org.apache.kafka.common.utils.Utils; | import org.apache.kafka.common.utils.Utils; | ||||||
|  | import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; | ||||||
|  | 
 | ||||||
|  | import java.util.Set; | ||||||
|  | import java.util.stream.Collectors; | ||||||
| 
 | 
 | ||||||
| import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; | import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; | ||||||
| import static org.apache.kafka.common.config.ConfigDef.Range.between; | import static org.apache.kafka.common.config.ConfigDef.Range.between; | ||||||
|  | @ -58,7 +63,7 @@ public class ShareGroupConfig { | ||||||
|     public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 60000; |     public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 60000; | ||||||
|     public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups."; |     public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups."; | ||||||
| 
 | 
 | ||||||
|     public static final ConfigDef CONFIG_DEF =  new ConfigDef() |     public static final ConfigDef CONFIG_DEF = new ConfigDef() | ||||||
|             .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC) |             .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC) | ||||||
|             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC) |             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC) | ||||||
|             .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) |             .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) | ||||||
|  | @ -76,7 +81,12 @@ public class ShareGroupConfig { | ||||||
|     private final int shareGroupMinRecordLockDurationMs; |     private final int shareGroupMinRecordLockDurationMs; | ||||||
| 
 | 
 | ||||||
|     public ShareGroupConfig(AbstractConfig config) { |     public ShareGroupConfig(AbstractConfig config) { | ||||||
|         isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG); |         // Share groups are enabled in two cases: 1) The internal configuration to enable it is | ||||||
|  |         // explicitly set; or 2) the share rebalance protocol is enabled. | ||||||
|  |         Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) | ||||||
|  |             .stream().map(String::toUpperCase).collect(Collectors.toSet()); | ||||||
|  |         isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) || | ||||||
|  |             protocols.contains(GroupType.SHARE.name()); | ||||||
|         shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); |         shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); | ||||||
|         shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); |         shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); | ||||||
|         shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); |         shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue