mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property (#14161)
The purpose of this change is to not allow a broker to start up with Tiered Storage disabled (remote.log.storage.system.enable=false) while there are still topics that have 'remote.storage.enable' set. Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This commit is contained in:
		
							parent
							
								
									1c5020e142
								
							
						
					
					
						commit
						efec0f5756
					
				|  | @ -871,12 +871,17 @@ class LogManager(logDirs: Seq[File], | ||||||
|    * Update the configuration of the provided topic. |    * Update the configuration of the provided topic. | ||||||
|    */ |    */ | ||||||
|   def updateTopicConfig(topic: String, |   def updateTopicConfig(topic: String, | ||||||
|                         newTopicConfig: Properties): Unit = { |                         newTopicConfig: Properties, | ||||||
|  |                         isRemoteLogStorageSystemEnabled: Boolean): Unit = { | ||||||
|     topicConfigUpdated(topic) |     topicConfigUpdated(topic) | ||||||
|     val logs = logsByTopic(topic) |     val logs = logsByTopic(topic) | ||||||
|     if (logs.nonEmpty) { |  | ||||||
|     // Combine the default properties with the overrides in zk to create the new LogConfig |     // Combine the default properties with the overrides in zk to create the new LogConfig | ||||||
|     val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) |     val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) | ||||||
|  |     // We would like to validate the configuration no matter whether the logs have materialised on disk or not. | ||||||
|  |     // Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check | ||||||
|  |     // failing since the logs for the topic are non-existent. | ||||||
|  |     LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true) | ||||||
|  |     if (logs.nonEmpty) { | ||||||
|       logs.foreach { log => |       logs.foreach { log => | ||||||
|         val oldLogConfig = log.updateConfig(newLogConfig) |         val oldLogConfig = log.updateConfig(newLogConfig) | ||||||
|         if (oldLogConfig.compact && !newLogConfig.compact) { |         if (oldLogConfig.compact && !newLogConfig.compact) { | ||||||
|  |  | ||||||
|  | @ -66,9 +66,11 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, | ||||||
|     topicConfig.asScala.forKeyValue { (key, value) => |     topicConfig.asScala.forKeyValue { (key, value) => | ||||||
|       if (!configNamesToExclude.contains(key)) props.put(key, value) |       if (!configNamesToExclude.contains(key)) props.put(key, value) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|     val logs = logManager.logsByTopic(topic) |     val logs = logManager.logsByTopic(topic) | ||||||
|     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) |     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) | ||||||
|     logManager.updateTopicConfig(topic, props) | 
 | ||||||
|  |     logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled) | ||||||
|     maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) |     maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -21,11 +21,10 @@ import kafka.server.KafkaConfig | ||||||
| import kafka.utils.{TestInfoUtils, TestUtils} | import kafka.utils.{TestInfoUtils, TestUtils} | ||||||
| import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} | import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} | ||||||
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.common.config.{ConfigResource, TopicConfig} | import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig} | ||||||
| import org.apache.kafka.common.errors.{InvalidConfigurationException, UnknownTopicOrPartitionException} | import org.apache.kafka.common.errors.{InvalidConfigurationException, UnknownTopicOrPartitionException} | ||||||
| import org.apache.kafka.common.utils.MockTime | import org.apache.kafka.common.utils.MockTime | ||||||
| import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, | import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState} | ||||||
|   RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState} |  | ||||||
| import org.junit.jupiter.api.Assertions._ | import org.junit.jupiter.api.Assertions._ | ||||||
| import org.junit.jupiter.api.function.Executable | import org.junit.jupiter.api.function.Executable | ||||||
| import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} | import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} | ||||||
|  | @ -299,6 +298,43 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { | ||||||
|       "Remote log segments should be deleted only once by the leader") |       "Remote log segments should be deleted only once by the leader") | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) | ||||||
|  |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = { | ||||||
|  |     val topicConfig = new Properties() | ||||||
|  |     topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") | ||||||
|  | 
 | ||||||
|  |     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, | ||||||
|  |       topicConfig = topicConfig) | ||||||
|  | 
 | ||||||
|  |     val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head | ||||||
|  |     instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) | ||||||
|  | 
 | ||||||
|  |     if (isKRaftTest()) { | ||||||
|  |       recreateBrokers(startup = true) | ||||||
|  |       assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException]) | ||||||
|  |       // Normally the exception is thrown as part of the TearDown method of the parent class(es). We would like to not do this. | ||||||
|  |       faultHandler.setIgnore(true) | ||||||
|  |     } else { | ||||||
|  |       assertThrows(classOf[ConfigException], () => recreateBrokers(startup = true)) | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) | ||||||
|  |   @ValueSource(strings = Array("zk", "kraft")) | ||||||
|  |   def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = { | ||||||
|  |     val topicConfig = new Properties() | ||||||
|  |     topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) | ||||||
|  | 
 | ||||||
|  |     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, | ||||||
|  |       topicConfig = topicConfig) | ||||||
|  | 
 | ||||||
|  |     val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head | ||||||
|  |     instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) | ||||||
|  | 
 | ||||||
|  |     recreateBrokers(startup = true) | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|   private def assertThrowsException(exceptionType: Class[_ <: Throwable], |   private def assertThrowsException(exceptionType: Class[_ <: Throwable], | ||||||
|                                     executable: Executable, |                                     executable: Executable, | ||||||
|                                     message: String = ""): Throwable = { |                                     message: String = ""): Throwable = { | ||||||
|  |  | ||||||
|  | @ -630,7 +630,7 @@ class LogManagerTest { | ||||||
|     val newProperties = new Properties() |     val newProperties = new Properties() | ||||||
|     newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) |     newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) | ||||||
| 
 | 
 | ||||||
|     spyLogManager.updateTopicConfig(topic, newProperties) |     spyLogManager.updateTopicConfig(topic, newProperties, false) | ||||||
| 
 | 
 | ||||||
|     assertTrue(log0.config.delete) |     assertTrue(log0.config.delete) | ||||||
|     assertTrue(log1.config.delete) |     assertTrue(log1.config.delete) | ||||||
|  |  | ||||||
|  | @ -552,19 +552,24 @@ public class LogConfig extends AbstractConfig { | ||||||
|         validateValues(props); |         validateValues(props); | ||||||
|         boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); |         boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); | ||||||
|         if (isRemoteLogStorageEnabled) { |         if (isRemoteLogStorageEnabled) { | ||||||
|             validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled); |             validateRemoteStorageOnlyIfSystemEnabled(props, isRemoteLogStorageSystemEnabled, false); | ||||||
|             validateNoRemoteStorageForCompactedTopic(props); |             validateNoRemoteStorageForCompactedTopic(props); | ||||||
|             validateRemoteStorageRetentionSize(props); |             validateRemoteStorageRetentionSize(props); | ||||||
|             validateRemoteStorageRetentionTime(props); |             validateRemoteStorageRetentionTime(props); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) { |     public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) { | ||||||
|         if (!isRemoteLogStorageSystemEnabled) { |         boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); | ||||||
|  |         if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) { | ||||||
|  |             if (isReceivingConfigFromStore) { | ||||||
|  |                 throw new ConfigException("You have to delete all topics with the property remote.storage.enable=true before disabling tiered storage cluster-wide"); | ||||||
|  |             } else { | ||||||
|                 throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + |                 throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + | ||||||
|                         "Topic cannot be configured with remote log storage."); |                         "Topic cannot be configured with remote log storage."); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     private static void validateNoRemoteStorageForCompactedTopic(Map<?, ?> props) { |     private static void validateNoRemoteStorageForCompactedTopic(Map<?, ?> props) { | ||||||
|         String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); |         String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue