From 4bba2c8a32a3e35f6870cf3f738c0eef8bb652d2 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 21 Jul 2023 18:30:28 +0300 Subject: [PATCH] KAFKA-14591: Move DeleteRecordsCommand to tools (#13278) Reviewers: Mickael Maison , Federico Valeri --- bin/kafka-delete-records.sh | 2 +- bin/windows/kafka-delete-records.bat | 2 +- .../main/scala/kafka/admin/AdminUtils.scala | 2 +- .../kafka/admin/DeleteRecordsCommand.scala | 137 ------------- .../kafka/admin/LeaderElectionCommand.scala | 2 +- .../admin/ReassignPartitionsCommand.scala | 2 +- .../main/scala/kafka/admin/TopicCommand.scala | 2 +- .../kafka/controller/KafkaController.scala | 3 +- .../scala/kafka/server/ZkAdminManager.scala | 3 +- .../main/scala/kafka/zk/AdminZkClient.scala | 3 +- .../unit/kafka/admin/AddPartitionsTest.scala | 1 + .../LeaderElectionCommandErrorTest.scala | 2 +- .../admin/LeaderElectionCommandTest.scala | 3 +- .../admin/ReassignPartitionsUnitTest.scala | 2 +- .../unit/kafka/admin/TopicCommandTest.scala | 2 +- .../unit/kafka/server/DynamicConfigTest.scala | 2 +- .../unit/kafka/zk/AdminZkClientTest.scala | 1 + .../common/AdminCommandFailedException.java | 21 +- .../common/AdminOperationException.java | 19 +- .../kafka/tools/DeleteRecordsCommand.java | 183 ++++++++++++++++++ .../kafka/tools/DeleteRecordsCommandTest.java | 182 +++++++++++++++++ 21 files changed, 408 insertions(+), 168 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala rename core/src/main/scala/kafka/admin/AdminOperationException.scala => server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java (60%) rename core/src/main/scala/kafka/common/AdminCommandFailedException.scala => server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java (61%) create mode 100644 tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh index 8726f919992..e9db8f95c58 100755 --- a/bin/kafka-delete-records.sh +++ b/bin/kafka-delete-records.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.DeleteRecordsCommand "$@" diff --git a/bin/windows/kafka-delete-records.bat b/bin/windows/kafka-delete-records.bat index d07e05f88a2..a883ec707e9 100644 --- a/bin/windows/kafka-delete-records.bat +++ b/bin/windows/kafka-delete-records.bat @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License. -"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %* diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a37e3d68e64..5ac09ab5348 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,9 +18,9 @@ package kafka.admin import java.util.Random - import kafka.utils.Logging import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException} +import org.apache.kafka.server.common.AdminOperationException import collection.{Map, mutable, _} diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala deleted file mode 100644 index b1747313708..00000000000 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import java.io.PrintStream -import java.util.Properties -import kafka.common.AdminCommandFailedException -import kafka.utils.json.JsonValue -import kafka.utils.{CoreUtils, Json} -import org.apache.kafka.clients.admin.{Admin, RecordsToDelete} -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} - -import scala.jdk.CollectionConverters._ -import scala.collection.Seq - -/** - * A command for delete records of the given partitions down to the specified offset. - */ -object DeleteRecordsCommand { - - private[admin] val EarliestVersion = 1 - - def main(args: Array[String]): Unit = { - execute(args, System.out) - } - - def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = { - Json.parseFull(jsonData) match { - case Some(js) => - val version = js.asJsonObject.get("version") match { - case Some(jsonValue) => jsonValue.to[Int] - case None => EarliestVersion - } - parseJsonData(version, js) - case None => throw new AdminOperationException("The input string is not a valid JSON") - } - } - - def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] = { - version match { - case 1 => - js.asJsonObject.get("partitions") match { - case Some(partitions) => - partitions.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs => - val topic = partitionJs("topic").to[String] - val partition = partitionJs("partition").to[Int] - val offset = partitionJs("offset").to[Long] - new TopicPartition(topic, partition) -> offset - }.toBuffer - case _ => throw new AdminOperationException("Missing partitions field"); - } - case _ => throw new AdminOperationException(s"Not supported version field value $version") - } - } - - def execute(args: Array[String], out: PrintStream): Unit = { - val opts = new DeleteRecordsCommandOptions(args) - val adminClient = createAdminClient(opts) - val offsetJsonFile = opts.options.valueOf(opts.offsetJsonFileOpt) - val offsetJsonString = Utils.readFileAsString(offsetJsonFile) - val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString) - - val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, _) => tp }) - if (duplicatePartitions.nonEmpty) - throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(","))) - - val recordsToDelete = offsetSeq.map { case (topicPartition, offset) => - (topicPartition, RecordsToDelete.beforeOffset(offset)) - }.toMap.asJava - - out.println("Executing records delete operation") - val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete) - out.println("Records delete operation completed:") - - deleteRecordsResult.lowWatermarks.forEach { (tp, partitionResult) => - try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}") - catch { - case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}") - } - } - - adminClient.close() - } - - private def createAdminClient(opts: DeleteRecordsCommandOptions): Admin = { - val props = if (opts.options.has(opts.commandConfigOpt)) - Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) - else - new Properties() - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - Admin.create(props) - } - - class DeleteRecordsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val BootstrapServerDoc = "REQUIRED: The server to connect to." - val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" + - "{\"partitions\":\n [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}" - val CommandConfigDoc = "A property file containing configs to be passed to Admin Client." - - val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) - .withRequiredArg - .describedAs("server(s) to use for bootstrapping") - .ofType(classOf[String]) - val offsetJsonFileOpt = parser.accepts("offset-json-file", offsetJsonFileDoc) - .withRequiredArg - .describedAs("Offset json file path") - .ofType(classOf[String]) - val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) - .withRequiredArg - .describedAs("command config property file path") - .ofType(classOf[String]) - - options = parser.parse(args : _*) - - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset.") - - CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt) - } -} diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index 140f4b70177..868c54916e9 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -19,7 +19,6 @@ package kafka.admin import java.util.Properties import java.util.concurrent.ExecutionException import joptsimple.util.EnumConverter -import kafka.common.AdminCommandFailedException import kafka.utils.CoreUtils import kafka.utils.Implicits._ import kafka.utils.Json @@ -31,6 +30,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.errors.ElectionNotNeededException import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index d1b75c483eb..40f688c7085 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -19,7 +19,6 @@ package kafka.admin import java.util import java.util.Optional import java.util.concurrent.ExecutionException -import kafka.common.AdminCommandFailedException import kafka.server.DynamicConfig import kafka.utils.{CoreUtils, Exit, Json, Logging} import kafka.utils.Implicits._ @@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica} +import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.LogConfig diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index b4c74d35cdf..568dd1798a1 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -20,7 +20,6 @@ package kafka.admin import java.util import java.util.{Collections, Optional, Properties} import joptsimple._ -import kafka.common.AdminCommandFailedException import kafka.utils._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.CreatePartitionsOptions @@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.LogConfig import org.apache.kafka.server.util.TopicFilter.IncludeList diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c3f76e83d12..fa2575d9d8b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -19,7 +19,6 @@ package kafka.controller import com.yammer.metrics.core.Timer import java.util.concurrent.TimeUnit -import kafka.admin.AdminOperationException import kafka.api._ import kafka.common._ import kafka.cluster.Broker @@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.LeaderRecoveryState -import org.apache.kafka.server.common.ProducerIdsBlock +import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.KafkaScheduler import org.apache.zookeeper.KeeperException diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 97cf2b2f713..31ab40430d7 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -18,7 +18,7 @@ package kafka.server import java.util import java.util.Properties -import kafka.admin.{AdminOperationException, AdminUtils} +import kafka.admin.AdminUtils import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps} import kafka.server.DynamicConfig.QuotaConfigs @@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError} import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} import org.apache.kafka.common.utils.Sanitizer +import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.storage.internals.log.LogConfig import scala.collection.{Map, mutable, _} diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index c735d53eba9..07ca35cc9ae 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -17,7 +17,7 @@ package kafka.zk import java.util.Properties -import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode} +import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode} import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.controller.ReplicaAssignment import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} @@ -26,6 +26,7 @@ import kafka.utils.Implicits._ import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic +import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.storage.internals.log.LogConfig import org.apache.zookeeper.KeeperException.NodeExistsException diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 07d00963d52..ea57207e278 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic} import org.apache.kafka.common.errors.InvalidReplicaAssignmentException import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} +import org.apache.kafka.server.common.AdminOperationException import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala index eaef9367a1d..6d36120b136 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala @@ -16,8 +16,8 @@ */ package kafka.admin -import kafka.common.AdminCommandFailedException import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.server.common.AdminCommandFailedException import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 278bcf0e68c..ff6cd2cad60 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -18,8 +18,6 @@ package kafka.admin import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path} - -import kafka.common.AdminCommandFailedException import kafka.server.IntegrationTestUtils.createTopic import kafka.server.{KafkaConfig, KafkaServer} import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} @@ -29,6 +27,7 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.server.common.AdminCommandFailedException import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{BeforeEach, Tag} diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala index 71bbcb91601..fab7907e17b 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala @@ -20,12 +20,12 @@ package kafka.admin import java.util.concurrent.ExecutionException import java.util.{Arrays, Collections} import kafka.admin.ReassignPartitionsCommand._ -import kafka.common.AdminCommandFailedException import kafka.utils.Exit import org.apache.kafka.clients.admin.{Config, MockAdminClient, PartitionReassignment} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidReplicationFactorException, UnknownTopicOrPartitionException} import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, TopicPartitionReplica} +import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout} diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 59def64ec85..088c9dd0205 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -17,13 +17,13 @@ package kafka.admin import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, TopicService} -import kafka.common.AdminCommandFailedException import kafka.utils.Exit import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, TopicDescription} import org.apache.kafka.common.Node import org.apache.kafka.common.TopicPartitionInfo import org.apache.kafka.common.errors.ThrottlingQuotaExceededException import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatcher diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala index 621c94f67e6..ddcb6377357 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala @@ -16,11 +16,11 @@ */ package kafka.server -import kafka.admin.AdminOperationException import kafka.utils.CoreUtils._ import kafka.server.QuorumTestHarness import org.apache.kafka.common.config._ import org.apache.kafka.common.config.internals.QuotaConfigs +import org.apache.kafka.server.common.AdminOperationException import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.api.Test diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index 833fd930414..ba0a2583598 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.internals.QuotaConfigs import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException} import org.apache.kafka.common.metrics.Quota +import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.storage.internals.log.LogConfig import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ diff --git a/core/src/main/scala/kafka/admin/AdminOperationException.scala b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java similarity index 60% rename from core/src/main/scala/kafka/admin/AdminOperationException.scala rename to server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java index a45b3f7e93a..62bb8425546 100644 --- a/core/src/main/scala/kafka/admin/AdminOperationException.scala +++ b/server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -15,9 +15,14 @@ * limitations under the License. */ -package kafka.admin +package org.apache.kafka.server.common; -class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) { - def this(error: Throwable) = this(error.getMessage, error) - def this(msg: String) = this(msg, null) -} \ No newline at end of file +public class AdminCommandFailedException extends RuntimeException { + public AdminCommandFailedException(String message) { + super(message); + } + + public AdminCommandFailedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java similarity index 61% rename from core/src/main/scala/kafka/common/AdminCommandFailedException.scala rename to server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java index 94e28641dd0..03826d1ac00 100644 --- a/core/src/main/scala/kafka/common/AdminCommandFailedException.scala +++ b/server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -15,9 +15,14 @@ * limitations under the License. */ -package kafka.common +package org.apache.kafka.server.common; -class AdminCommandFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) +public class AdminOperationException extends RuntimeException { + public AdminOperationException(String message) { + super(message); + } + + public AdminOperationException(Throwable cause) { + super(cause.getMessage(), cause); + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java new file mode 100644 index 00000000000..5e44865b200 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A command for delete records of the given partitions down to the specified offset. + */ +public class DeleteRecordsCommand { + private static final int EARLIEST_VERSION = 1; + + private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + + private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong(); + + private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + + public static void main(String[] args) throws Exception { + execute(args, System.out); + } + + static Map> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException { + JsonValue js = Json.parseFull(jsonData) + .orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON")); + + Optional version = js.asJsonObject().get("version"); + + return parseJsonData(version.isPresent() ? version.get().to(INT) : EARLIEST_VERSION, js); + } + + private static Map> parseJsonData(int version, JsonValue js) throws JsonMappingException { + if (version == 1) { + JsonValue partitions = js.asJsonObject().get("partitions") + .orElseThrow(() -> new AdminOperationException("Missing partitions field")); + + Map> res = new HashMap<>(); + + Iterator iterator = partitions.asJsonArray().iterator(); + + while (iterator.hasNext()) { + JsonObject partitionJs = iterator.next().asJsonObject(); + + String topic = partitionJs.apply("topic").to(STRING); + int partition = partitionJs.apply("partition").to(INT); + long offset = partitionJs.apply("offset").to(LONG); + + res.computeIfAbsent(new TopicPartition(topic, partition), k -> new ArrayList<>()).add(offset); + } + + return res; + } + + throw new AdminOperationException("Not supported version field value " + version); + } + + public static void execute(String[] args, PrintStream out) throws IOException { + DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args); + + try (Admin adminClient = createAdminClient(opts)) { + execute(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out); + } + } + + static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException { + Map> offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString); + + Set duplicatePartitions = offsetSeq.entrySet().stream() + .filter(e -> e.getValue().size() > 1) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!duplicatePartitions.isEmpty()) { + StringJoiner duplicates = new StringJoiner(","); + duplicatePartitions.forEach(tp -> duplicates.add(tp.toString())); + throw new AdminCommandFailedException( + String.format("Offset json file contains duplicate topic partitions: %s", duplicates) + ); + } + + Map recordsToDelete = new HashMap<>(); + + for (Map.Entry> e : offsetSeq.entrySet()) + recordsToDelete.put(e.getKey(), RecordsToDelete.beforeOffset(e.getValue().get(0))); + + out.println("Executing records delete operation"); + DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete); + out.println("Records delete operation completed:"); + + deleteRecordsResult.lowWatermarks().forEach((tp, partitionResult) -> { + try { + out.printf("partition: %s\tlow_watermark: %s%n", tp, partitionResult.get().lowWatermark()); + } catch (InterruptedException | ExecutionException e) { + out.printf("partition: %s\terror: %s%n", tp, e.getMessage()); + } + }); + } + + private static Admin createAdminClient(DeleteRecordsCommandOptions opts) throws IOException { + Properties props = opts.options.has(opts.commandConfigOpt) + ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) + : new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + return Admin.create(props); + } + + private static class DeleteRecordsCommandOptions extends CommandDefaultOptions { + private final OptionSpec bootstrapServerOpt; + private final OptionSpec offsetJsonFileOpt; + private final OptionSpec commandConfigOpt; + + public DeleteRecordsCommandOptions(String[] args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server to connect to.") + .withRequiredArg() + .describedAs("server(s) to use for bootstrapping") + .ofType(String.class); + + offsetJsonFileOpt = parser.accepts("offset-json-file", "REQUIRED: The JSON file with offset per partition. " + + "The format to use is:\n" + + "{\"partitions\":\n [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}") + .withRequiredArg() + .describedAs("Offset json file path") + .ofType(String.class); + + commandConfigOpt = parser.accepts("command-config", "A property file containing configs to be passed to Admin Client.") + .withRequiredArg() + .describedAs("command config property file path") + .ofType(String.class); + + options = parser.parse(args); + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset."); + + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java new file mode 100644 index 00000000000..2c06fb66ba0 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL) +@Tag("integration") +public class DeleteRecordsCommandTest { + + private final ClusterInstance cluster; + public DeleteRecordsCommandTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testCommand() throws Exception { + Properties adminProps = new Properties(); + + adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1); + + try (Admin admin = cluster.createAdminClient(adminProps)) { + assertThrows( + AdminCommandFailedException.class, + () -> DeleteRecordsCommand.execute(admin, "{\"partitions\":[" + + "{\"topic\":\"t\", \"partition\":0, \"offset\":1}," + + "{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" + + "}", System.out), + "Offset json file contains duplicate topic partitions: t-0" + ); + + admin.createTopics(Collections.singleton(new NewTopic("t", 1, (short) 1))).all().get(); + + Properties props = new Properties(); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + try (KafkaProducer producer = new KafkaProducer<>(props)) { + producer.send(new ProducerRecord<>("t", "1")).get(); + producer.send(new ProducerRecord<>("t", "2")).get(); + producer.send(new ProducerRecord<>("t", "3")).get(); + } + + executeAndAssertOutput( + "{\"partitions\":[{\"topic\":\"t\", \"partition\":0, \"offset\":1}]}", + "partition: t-0\tlow_watermark: 1", + admin + ); + + executeAndAssertOutput( + "{\"partitions\":[{\"topic\":\"t\", \"partition\":42, \"offset\":42}]}", + "partition: t-42\terror", + admin + ); + } + } + + private static void executeAndAssertOutput(String json, String expOut, Admin admin) { + String output = ToolsTestUtils.captureStandardOut(() -> { + try { + DeleteRecordsCommand.execute(admin, json, System.out); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + assertTrue(output.contains(expOut)); + } +} + +/** + * Unit test of {@link DeleteRecordsCommand} tool. + */ +class DeleteRecordsCommandUnitTest { + @Test + public void testOffsetFileNotExists() { + assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new String[]{ + "--bootstrap-server", "localhost:9092", + "--offset-json-file", "/not/existing/file" + })); + } + + @Test + public void testCommandConfigNotExists() { + assertThrows(NoSuchFileException.class, () -> DeleteRecordsCommand.main(new String[] { + "--bootstrap-server", "localhost:9092", + "--offset-json-file", "/not/existing/file", + "--command-config", "/another/not/existing/file" + })); + } + + @Test + public void testWrongVersion() { + assertCommandThrows(JsonProcessingException.class, "{\"version\":\"string\"}"); + assertCommandThrows(AdminOperationException.class, "{\"version\":2}"); + } + + @Test + public void testWrongPartitions() { + assertCommandThrows(AdminOperationException.class, "{\"version\":1}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":2}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":{}}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{}]}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\"}]}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}"); + assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}"); + } + + @Test + public void testParse() throws Exception { + Map> res = DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup( + "{\"partitions\":[" + + "{\"topic\":\"t\", \"partition\":0, \"offset\":0}," + + "{\"topic\":\"t\", \"partition\":1, \"offset\":1, \"ignored\":\"field\"}," + + "{\"topic\":\"t\", \"partition\":0, \"offset\":2}," + + "{\"topic\":\"t\", \"partition\":0, \"offset\":0}" + + "]}" + ); + + assertEquals(2, res.size()); + assertEquals(Arrays.asList(0L, 2L, 0L), res.get(new TopicPartition("t", 0))); + assertEquals(Collections.singletonList(1L), res.get(new TopicPartition("t", 1))); + } + + /** + * Asserts that {@link DeleteRecordsCommand#parseOffsetJsonStringWithoutDedup(String)} throws {@link AdminOperationException}. + * @param jsonData Data to check. + */ + private static void assertCommandThrows(Class expectedException, String jsonData) { + assertThrows( + expectedException, + () -> DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(jsonData) + ); + } +} \ No newline at end of file