KAFKA-6955: Use Java AdminClient in DeleteRecordsCommand (#5088)

- Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's
no longer used.
- Removed redundant tests and rewrote non redundant ones to use the Java
AdminClient.

Reviewers: Viktor Somogyi <viktor.somogyi@cloudera.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Vahid Hashemian 2018-06-02 12:41:55 -07:00 committed by Ismael Juma
parent 73e2cbbd8f
commit 341d5db260
5 changed files with 87 additions and 214 deletions

View File

@ -18,7 +18,6 @@ import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging
@ -216,73 +215,6 @@ class AdminClient(val time: Time,
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
}.toMap
/*
* Remove all the messages whose offset is smaller than the given offset of the corresponding partition
*
* DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
* and their interpretations below:
*
* - DisconnectException if leader node of the partition is not available. Need retry by user.
* - PolicyViolationException if the topic is configured as non-deletable.
* - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
* - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
* - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
* - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
* - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
*
*/
def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty)
error(s"Metadata request contained errors: $errors")
val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
!response.errors().containsKey(partitionAndOffset._1.topic())}
val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
response.cluster().leaderFor(partitionAndOffset._1) != null}
val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
response.cluster().leaderFor(partitionAndOffset._1))
// prepare requests and generate Future objects
val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
pendingFutures.add(future)
future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
future.complete(result)
pendingFutures.remove(future)
}
override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
future.complete(result)
pendingFutures.remove(future)
}
})
}
// default output if not receiving DeleteRecordsResponse before timeout
val defaultResults = offsets.mapValues(_ =>
DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
new CompositeFuture(time, defaultResults, futures.toList)
}
/**
* Case class used to represent a consumer of a consumer group
*/
@ -473,8 +405,6 @@ object AdminClient {
config
}
case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def createSimplePlaintext(brokerUrl: String): AdminClient = {

View File

@ -20,14 +20,17 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.AdminCommandFailedException
import kafka.utils.{CoreUtils, Json, CommandLineUtils}
import kafka.utils.{CommandLineUtils, CoreUtils, Json}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.admin
import org.apache.kafka.clients.admin.RecordsToDelete
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
import scala.collection.JavaConverters._
/**
* A command for delete records of the given partitions down to the specified offset.
*/
@ -61,26 +64,31 @@ object DeleteRecordsCommand {
if (duplicatePartitions.nonEmpty)
throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
(topicPartition, RecordsToDelete.beforeOffset(offset))
}.toMap.asJava
out.println("Executing records delete operation")
val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
out.println("Records delete operation completed:")
deleteRecordsResult.foreach{ case (tp, partitionResult) => {
if (partitionResult.error == null)
out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
else
out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => {
try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}")
catch {
case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}")
}
}}
adminClient.close()
}
private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.AdminClient = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
AdminClient.create(props)
admin.AdminClient.create(props)
}
class DeleteRecordsCommandOptions(args: Array[String]) {

View File

@ -49,8 +49,6 @@ import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
import org.scalatest.Assertions.intercept
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@ -291,8 +289,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
)
}
}
client.close()
}
@Test
@ -746,7 +742,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(0L, consumer.position(topicPartition))
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
val lowWatermark = result.lowWatermarks().get(topicPartition).get.lowWatermark
assertEquals(5L, lowWatermark)
consumer.seekToBeginning(Collections.singletonList(topicPartition))
@ -755,7 +751,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
consumer.seek(topicPartition, 7L)
assertEquals(7L, consumer.position(topicPartition))
client.close()
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava).all.get
consumer.seekToBeginning(Collections.singletonList(topicPartition))
assertEquals(10L, consumer.position(topicPartition))
}
@Test
@ -794,7 +792,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
}
}, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
client.close()
}
@Test
@ -807,13 +804,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
sendRecords(producers.head, 10, topicPartition)
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
assertEquals(3L, lowWatermark)
for (i <- 0 until serverCount)
assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
client.close()
}
@Test
@ -829,14 +824,68 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
result.all().get()
result.all.get
assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava)
result.all().get()
result.all.get
assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition))
}
client.close()
@Test
def testConsumeAfterDeleteRecords(): Unit = {
val consumer = consumers.head
subscribeAndWaitForAssignment(topic, consumer)
client = AdminClient.create(createConfig)
sendRecords(producers.head, 10, topicPartition)
var messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 10
}, "Expected 10 messages", 3000L)
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava).all.get
consumer.seek(topicPartition, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 7
}, "Expected 7 messages", 3000L)
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(8L)).asJava).all.get
consumer.seek(topicPartition, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count
messageCount == 2
}, "Expected 2 messages", 3000L)
}
@Test
def testDeleteRecordsWithException(): Unit = {
subscribeAndWaitForAssignment(topic, consumers.head)
client = AdminClient.create(createConfig)
sendRecords(producers.head, 10, topicPartition)
assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
.lowWatermarks.get(topicPartition).get.lowWatermark)
// OffsetOutOfRangeException if offset > high_watermark
var cause = intercept[ExecutionException] {
client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get
}.getCause
assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
val nonExistPartition = new TopicPartition(topic, 3)
// LeaderNotAvailableException if non existent partition
cause = intercept[ExecutionException] {
client.deleteRecords(Map(nonExistPartition -> RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get
}.getCause
assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
}
@Test
@ -856,8 +905,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic))
assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
client.close()
}
private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
@ -902,7 +949,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
classOf[SecurityDisabledException])
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
classOf[SecurityDisabledException])
client.close()
}
/**
@ -955,7 +1001,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
val endTimeMs = Time.SYSTEM.milliseconds()
assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
client.close()
}
/**
@ -973,7 +1018,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().validateOnly(true)).all()
future2.get
client.close()
assertEquals(1, factory.failuresInjected)
}
@ -1091,6 +1135,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
Utils.closeQuietly(client, "adminClient")
}
}
}
object AdminClientIntegrationTest {

View File

@ -17,20 +17,19 @@
package kafka.api
import java.util.Collections
import java.util.concurrent.TimeUnit
import kafka.admin.AdminClient
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.server.KafkaConfig
import java.lang.{Long => JLong}
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{Errors, ApiKeys}
import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Test}
import org.junit.Assert._
import scala.collection.JavaConverters._
/**
@ -78,122 +77,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
super.tearDown()
}
@Test
def testSeekToBeginningAfterDeleteRecords() {
val consumer = consumers.head
subscribeAndWaitForAssignment(topic, consumer)
sendRecords(producers.head, 10, tp)
consumer.seekToBeginning(Collections.singletonList(tp))
assertEquals(0L, consumer.position(tp))
client.deleteRecordsBefore(Map((tp, 5L))).get()
consumer.seekToBeginning(Collections.singletonList(tp))
assertEquals(5L, consumer.position(tp))
client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
consumer.seekToBeginning(Collections.singletonList(tp))
assertEquals(10L, consumer.position(tp))
}
@Test
def testConsumeAfterDeleteRecords() {
val consumer = consumers.head
subscribeAndWaitForAssignment(topic, consumer)
sendRecords(producers.head, 10, tp)
var messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count()
messageCount == 10
}, "Expected 10 messages", 3000L)
client.deleteRecordsBefore(Map((tp, 3L))).get()
consumer.seek(tp, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count()
messageCount == 7
}, "Expected 7 messages", 3000L)
client.deleteRecordsBefore(Map((tp, 8L))).get()
consumer.seek(tp, 1)
messageCount = 0
TestUtils.waitUntilTrue(() => {
messageCount += consumer.poll(0).count()
messageCount == 2
}, "Expected 2 messages", 3000L)
}
@Test
def testLogStartOffsetCheckpoint() {
subscribeAndWaitForAssignment(topic, consumers.head)
sendRecords(producers.head, 10, tp)
assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
for (i <- 0 until serverCount)
killBroker(i)
restartDeadBrokers()
client.close()
brokerList = TestUtils.bootstrapServers(servers, listenerName)
client = AdminClient.createSimplePlaintext(brokerList)
TestUtils.waitUntilTrue(() => {
// Need to retry if leader is not available for the partition
client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
}, "Expected low watermark of the partition to be 5L")
}
@Test
def testLogStartOffsetAfterDeleteRecords() {
subscribeAndWaitForAssignment(topic, consumers.head)
sendRecords(producers.head, 10, tp)
client.deleteRecordsBefore(Map((tp, 3L))).get()
for (i <- 0 until serverCount)
assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
}
@Test
def testOffsetsForTimesWhenOffsetNotFound() {
val consumer = consumers.head
assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
}
@Test
def testOffsetsForTimesAfterDeleteRecords() {
val consumer = consumers.head
subscribeAndWaitForAssignment(topic, consumer)
sendRecords(producers.head, 10, tp)
assertEquals(0L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
client.deleteRecordsBefore(Map((tp, 5L))).get()
assertEquals(5L, consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp).offset())
client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
}
@Test
def testDeleteRecordsWithException() {
subscribeAndWaitForAssignment(topic, consumers.head)
sendRecords(producers.head, 10, tp)
// Should get success result
assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
// OffsetOutOfRangeException if offset > high_watermark
assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
val nonExistPartition = new TopicPartition(topic, 3)
// UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
}
@Test
def testListGroups() {
subscribeAndWaitForAssignment(topic, consumers.head)

View File

@ -95,6 +95,7 @@
timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
will be removed in a future version.</li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>