KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734) (#10760)

This patch implements KIP-734 as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
thomaskwscott 2021-06-25 13:29:12 +01:00 committed by GitHub
parent 2beaf9a720
commit bd72ef1bf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 442 additions and 48 deletions

View File

@ -4209,11 +4209,7 @@ public class KafkaAdminClient extends AdminClient {
OffsetSpec offsetSpec = entry.getValue();
TopicPartition tp = entry.getKey();
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
long offsetQuery = (offsetSpec instanceof TimestampSpec)
? ((TimestampSpec) offsetSpec).timestamp()
: (offsetSpec instanceof OffsetSpec.EarliestSpec)
? ListOffsetsRequest.EARLIEST_TIMESTAMP
: ListOffsetsRequest.LATEST_TIMESTAMP;
long offsetQuery = getOffsetFromOffsetSpec(offsetSpec);
// avoid sending listOffsets request for topics with errors
if (!mr.errors().containsKey(tp.topic())) {
Node node = mr.cluster().leaderFor(tp);
@ -4236,10 +4232,12 @@ public class KafkaAdminClient extends AdminClient {
final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
private boolean supportsMaxTimestamp = true;
@Override
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
return ListOffsetsRequest.Builder
.forConsumer(true, context.options().isolationLevel())
.forConsumer(true, context.options().isolationLevel(), supportsMaxTimestamp)
.setTargetTimes(partitionsToQuery);
}
@ -4298,6 +4296,36 @@ public class KafkaAdminClient extends AdminClient {
}
}
}
@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsMaxTimestamp) {
supportsMaxTimestamp = false;
// fail any unsupported futures and remove partitions from the downgraded retry
boolean foundMaxTimestampPartition = false;
Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
while (topicIterator.hasNext()) {
ListOffsetsTopic topic = topicIterator.next();
Iterator<ListOffsetsPartition> partitionIterator = topic.partitions().iterator();
while (partitionIterator.hasNext()) {
ListOffsetsPartition partition = partitionIterator.next();
if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
foundMaxTimestampPartition = true;
futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
.completeExceptionally(new UnsupportedVersionException(
"Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
partitionIterator.remove();
}
}
if (topic.partitions().isEmpty()) {
topicIterator.remove();
}
}
return foundMaxTimestampPartition && !partitionsToQuery.isEmpty();
}
return false;
}
});
}
return calls;
@ -4834,6 +4862,17 @@ public class KafkaAdminClient extends AdminClient {
};
}
private long getOffsetFromOffsetSpec(OffsetSpec offsetSpec) {
if (offsetSpec instanceof TimestampSpec) {
return ((TimestampSpec) offsetSpec).timestamp();
} else if (offsetSpec instanceof OffsetSpec.EarliestSpec) {
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
return ListOffsetsRequest.MAX_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;
}
/**
* Get a sub level error when the request is in batch. If given key was not found,
* return an {@link IllegalArgumentException}.

View File

@ -25,6 +25,7 @@ public class OffsetSpec {
public static class EarliestSpec extends OffsetSpec { }
public static class LatestSpec extends OffsetSpec { }
public static class MaxTimestampSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@ -60,4 +61,13 @@ public class OffsetSpec {
return new TimestampSpec(timestamp);
}
/**
* Used to retrieve the offset with the largest timestamp of a partition
* as message timestamps can be specified client side this may not match
* the log end offset returned by LatestSpec
*/
public static OffsetSpec maxTimestamp() {
return new MaxTimestampSpec();
}
}

View File

@ -978,7 +978,7 @@ public class Fetcher<K, V> implements Closeable {
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean requireTimestamp) {
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamp, isolationLevel)
.forConsumer(requireTimestamp, isolationLevel, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);

View File

@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.Errors;
public class ListOffsetsRequest extends AbstractRequest {
public static final long EARLIEST_TIMESTAMP = -2L;
public static final long LATEST_TIMESTAMP = -1L;
public static final long MAX_TIMESTAMP = -3L;
public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;
@ -54,9 +55,11 @@ public class ListOffsetsRequest extends AbstractRequest {
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
}
public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
short minVersion = 0;
if (isolationLevel == IsolationLevel.READ_COMMITTED)
if (requireMaxTimestamp)
minVersion = 7;
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;

View File

@ -30,7 +30,9 @@
// Version 5 is the same as version 4.
//
// Version 6 enables flexible versions.
"validVersions": "0-6",
//
// Version 7 enables listing offsets by max timestamp (KIP-734).
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",

View File

@ -29,7 +29,9 @@
// Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
//
// Version 6 enables flexible versions.
"validVersions": "0-6",
//
// Version 7 is the same as version 6 (KIP-734).
"validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,

View File

@ -172,6 +172,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.ListTransactionsRequest;
@ -4032,6 +4033,7 @@ public class KafkaAdminClientTest {
pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0}));
pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0}));
pInfos.add(new PartitionInfo("qux", 0, node0, new Node[]{node0}, new Node[]{node0}));
final Cluster cluster =
new Cluster(
"mockClusterId",
@ -4044,6 +4046,7 @@ public class KafkaAdminClientTest {
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("bar", 0);
final TopicPartition tp2 = new TopicPartition("baz", 0);
final TopicPartition tp3 = new TopicPartition("qux", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@ -4053,15 +4056,17 @@ public class KafkaAdminClientTest {
ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
ListOffsetsTopicResponse t1 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 234L, 432);
ListOffsetsTopicResponse t2 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp2, Errors.NONE, 123456789L, 345L, 543);
ListOffsetsTopicResponse t3 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp3, Errors.NONE, 234567890L, 456L, 654);
ListOffsetsResponseData responseData = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0, t1, t2));
.setTopics(Arrays.asList(t0, t1, t2, t3));
env.kafkaClient().prepareResponse(new ListOffsetsResponse(responseData));
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.earliest());
partitions.put(tp2, OffsetSpec.forTimestamp(System.currentTimeMillis()));
partitions.put(tp3, OffsetSpec.maxTimestamp());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
@ -4075,9 +4080,13 @@ public class KafkaAdminClientTest {
assertEquals(345L, offsets.get(tp2).offset());
assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue());
assertEquals(123456789L, offsets.get(tp2).timestamp());
assertEquals(456L, offsets.get(tp3).offset());
assertEquals(654, offsets.get(tp3).leaderEpoch().get().intValue());
assertEquals(234567890L, offsets.get(tp3).timestamp());
assertEquals(offsets.get(tp0), result.partitionResult(tp0).get());
assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
try {
result.partitionResult(new TopicPartition("unknown", 0)).get();
fail("should have thrown IllegalArgumentException");
@ -4226,6 +4235,117 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
env.kafkaClient().prepareUnsupportedVersionResponse(
request -> request instanceof ListOffsetsRequest);
ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
}
}
@Test
public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("foo", 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
env.kafkaClient().prepareUnsupportedVersionResponse(
request -> request instanceof ListOffsetsRequest);
ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
ListOffsetsResponseData responseData = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(topicResponse));
env.kafkaClient().prepareResponseFrom(
// ensure that no max timestamp requests are retried
request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
.flatMap(t -> t.partitions().stream())
.noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP),
new ListOffsetsResponse(responseData), node);
ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
put(tp0, OffsetSpec.maxTimestamp());
put(tp1, OffsetSpec.latest());
}});
TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
assertEquals(345L, tp1Offset.offset());
assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
assertEquals(-1L, tp1Offset.timestamp());
}
}
@Test
public void testListOffsetsUnsupportedNonMaxTimestamp() {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
env.kafkaClient().prepareUnsupportedVersionResponse(
request -> request instanceof ListOffsetsRequest);
ListOffsetsResult result = env.adminClient().listOffsets(
Collections.singletonMap(tp0, OffsetSpec.latest()));
TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
}
}
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),

View File

@ -67,7 +67,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
.forConsumer(true, IsolationLevel.READ_COMMITTED)
.forConsumer(true, IsolationLevel.READ_COMMITTED, false)
.setTargetTimes(topics)
.build(version);
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
@ -100,7 +100,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(topics)
.build((short) 0);
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());

View File

@ -1454,7 +1454,7 @@ public class RequestResponseTest {
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version == 1) {
@ -1465,7 +1465,7 @@ public class RequestResponseTest {
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
@ -1478,7 +1478,7 @@ public class RequestResponseTest {
.setName("test")
.setPartitions(Arrays.asList(partition));
return ListOffsetsRequest.Builder
.forConsumer(true, IsolationLevel.READ_COMMITTED)
.forConsumer(true, IsolationLevel.READ_COMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else {

View File

@ -114,7 +114,9 @@ object ApiVersion {
// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
KAFKA_2_8_IV1,
// Introduce AllocateProducerIds (KIP-730)
KAFKA_3_0_IV0
KAFKA_3_0_IV0,
// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
KAFKA_3_0_IV1
)
// Map keys are the union of the short and full versions
@ -458,6 +460,13 @@ case object KAFKA_3_0_IV0 extends DefaultApiVersion {
val id: Int = 33
}
case object KAFKA_3_0_IV1 extends DefaultApiVersion {
val shortVersion: String = "3.0"
val subVersion = "IV1"
val recordVersion = RecordVersion.V2
val id: Int = 34
}
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {

View File

@ -1338,6 +1338,16 @@ class Log(@volatile private var _dir: File,
val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.toBuffer
val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
latestTimestampSegment.offsetOfMaxTimestampSoFar,
epochOptional))
} else {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.

View File

@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val listOffsetRequestVersion: Short =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.admin
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
val topicName = "foo"
var adminClient: Admin = null
@BeforeEach
override def setUp(): Unit = {
super.setUp()
createTopic(topicName, 1, 1.toShort)
produceMessages()
adminClient = Admin.create(Map[String, Object](
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
).asJava)
}
@AfterEach
override def tearDown(): Unit = {
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
super.tearDown()
}
@Test
def testEarliestOffset(): Unit = {
val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
assertEquals(0, earliestOffset.offset())
}
@Test
def testLatestOffset(): Unit = {
val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
assertEquals(3, latestOffset.offset())
}
@Test
def testMaxTimestampOffset(): Unit = {
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
assertEquals(1, maxTimestampOffset.offset())
}
private def runFetchOffsets(adminClient: Admin,
offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
val tp = new TopicPartition(topicName, 0)
adminClient.listOffsets(Map(
tp -> offsetSpec
).asJava, new ListOffsetsOptions()).all().get().get(tp)
}
def produceMessages(): Unit = {
val records = Seq(
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
null, new Array[Byte](10000)),
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
null, new Array[Byte](10000)),
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
null, new Array[Byte](10000)),
)
TestUtils.produceMessages(servers, records, -1)
}
def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
}

View File

@ -354,7 +354,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createListOffsetsRequest = {
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes(
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
List(new ListOffsetsTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetsPartition()

View File

@ -2059,6 +2059,35 @@ class LogTest {
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
}
@Test
def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
assertEquals(None, log.fetchOffsetByTimestamp(0L))
val firstTimestamp = mockTime.milliseconds
val leaderEpoch = 0
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = firstTimestamp),
leaderEpoch = leaderEpoch)
val secondTimestamp = firstTimestamp + 1
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = secondTimestamp),
leaderEpoch = leaderEpoch)
log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = firstTimestamp),
leaderEpoch = leaderEpoch)
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
}
/**
* Test the Log truncate operations
*/

View File

@ -2080,7 +2080,7 @@ class KafkaApisTest {
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
val capturedResponse = expectNoThrottling(request)
@ -3192,7 +3192,7 @@ class KafkaApisTest {
.setPartitions(List(new ListOffsetsPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
val capturedResponse = expectNoThrottling(request)

View File

@ -43,7 +43,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setCurrentLeaderEpoch(0)).asJava)).asJava
val consumerRequest = ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
.build()
@ -80,6 +80,18 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
}
@Test
def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
val consumerRequestBuilder = ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
}
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
val listOffsetPartition = new ListOffsetsPartition()
.setPartitionIndex(partition.partition)
@ -90,7 +102,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setName(topic)
.setPartitions(List(listOffsetPartition).asJava)).asJava
val request = ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
.build()
assertResponseError(error, brokerId, request)
@ -133,7 +145,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setTimestamp(timestamp)).asJava)).asJava
val builder = ListOffsetsRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
val request = if (version == -1) builder.build() else builder.build(version)
@ -162,11 +174,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)
TestUtils.generateAndProduceMessages(servers, topic, 10)
TestUtils.generateAndProduceMessages(servers, topic, 9)
TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L)
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
// Kill the first leader so that we can verify the epoch change when fetching the latest offset
killBroker(firstLeaderId)
@ -185,6 +199,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
// The latest offset reflects the updated epoch
assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
}
@Test
@ -192,7 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)
TestUtils.generateAndProduceMessages(servers, topic, 10)
TestUtils.generateAndProduceMessages(servers, topic, 9)
TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L)
for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
if (version == 0) {
@ -203,10 +219,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 4) {
} else if (version >= 4 && version <= 6) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 7) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
}
}
}

View File

@ -54,7 +54,7 @@ class LogOffsetTest extends BaseRequestTest {
@Test
def testGetOffsetsForUnknownTopic(): Unit = {
val topicPartition = new TopicPartition("foo", 0)
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
val response = sendListOffsetsRequest(request)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
@ -65,13 +65,7 @@ class LogOffsetTest extends BaseRequestTest {
def testGetOffsetsAfterDeleteRecords(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
createTopic(topic, 1, 1)
val logManager = server.getLogManager
TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
"Log for partition [topic,0] should be created")
val log = logManager.getLog(topicPartition).get
val log = createTopicAndGetLog(topic, topicPartition)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@ -92,17 +86,52 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
}
@Test
def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
for (timestamp <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
log.flush()
log.updateHighWatermark(log.logEndOffset)
val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
assertEquals(19L, firstOffset.get.offset)
assertEquals(19L, firstOffset.get.timestamp)
log.truncateTo(0)
val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
assertEquals(0L, secondOffset.get.offset)
assertEquals(-1L, secondOffset.get.timestamp)
}
@Test
def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
log.flush()
log.updateHighWatermark(log.logEndOffset)
val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
assertEquals(7L, log.logEndOffset)
assertEquals(5L, maxTimestampOffset.get.offset)
assertEquals(6L, maxTimestampOffset.get.timestamp)
}
@Test
def testGetOffsetsBeforeLatestTime(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
createTopic(topic, 1, 1)
val logManager = server.getLogManager
TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
s"Log for partition $topicPartition should be created")
val log = logManager.getLog(topicPartition).get
val log = createTopicAndGetLog(topic, topicPartition)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@ -111,7 +140,7 @@ class LogOffsetTest extends BaseRequestTest {
val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, server),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@ -149,6 +178,20 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(offsetChanged)
}
@Test
def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
log.updateHighWatermark(log.logEndOffset)
val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
assertEquals(0L, log.logEndOffset)
assertEquals(0L, maxTimestampOffset.get.offset)
assertEquals(-1L, maxTimestampOffset.get.timestamp)
}
@deprecated("legacyFetchOffsetsBefore", since = "")
@Test
def testGetOffsetsBeforeNow(): Unit = {
@ -266,4 +309,13 @@ class LogOffsetTest extends BaseRequestTest {
.partitions.asScala.find(_.partitionIndex == tp.partition).get
}
private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = {
createTopic(topic, 1, 1)
val logManager = server.getLogManager
TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
"Log for partition [topic,0] should be created")
logManager.getLog(topicPartition).get
}
}

View File

@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(List(topic).asJava)
case ApiKeys.LEADER_AND_ISR =>

View File

@ -1207,17 +1207,18 @@ object TestUtils extends Logging {
values
}
def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
def produceMessage(servers: Seq[KafkaServer], topic: String, message: String, timestamp: java.lang.Long = null,
deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000): Unit = {
val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers),
deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs)
try {
producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
producer.send(new ProducerRecord(topic, null, timestamp, topic.getBytes, message.getBytes)).get
} finally {
producer.close()
}
}
def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
// wait until admin path for delete topic is deleted, signaling completion of topic deletion

View File

@ -69,7 +69,7 @@ public class ListOffsetRequestBenchmark {
}
}
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.build(ApiKeys.LIST_OFFSETS.latestVersion());
}