mirror of https://github.com/apache/kafka.git
KAFKA-16480: ListOffsets change should have an associated API/IBP version update (#16781)
1. Use oldestAllowedVersion as 9 if using ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or ListOffsetsRequest#LATEST_TIERED_TIMESTAMP. 2. Add test cases to ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure requireTieredStorageTimestamp return 9 as minVersion. 3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec. 4. Add more cases to KafkaAdminClient#getOffsetFromSpec. 5. Add testListOffsetsEarliestLocalSpecMinVersion and testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make sure request builder has oldestAllowedVersion as 9. Signed-off-by: PoAn Yang <payang@apache.org> Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
4e75c57bbb
commit
66485b04c6
|
|
@ -4860,6 +4860,10 @@ public class KafkaAdminClient extends AdminClient {
|
|||
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
|
||||
} else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
|
||||
return ListOffsetsRequest.MAX_TIMESTAMP;
|
||||
} else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
|
||||
return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
|
||||
} else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
|
||||
return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
|
||||
}
|
||||
return ListOffsetsRequest.LATEST_TIMESTAMP;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ public class OffsetSpec {
|
|||
public static class EarliestSpec extends OffsetSpec { }
|
||||
public static class LatestSpec extends OffsetSpec { }
|
||||
public static class MaxTimestampSpec extends OffsetSpec { }
|
||||
public static class EarliestLocalSpec extends OffsetSpec { }
|
||||
public static class LatestTierSpec extends OffsetSpec { }
|
||||
public static class TimestampSpec extends OffsetSpec {
|
||||
private final long timestamp;
|
||||
|
||||
|
|
@ -70,4 +72,21 @@ public class OffsetSpec {
|
|||
return new MaxTimestampSpec();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to retrieve the offset with the local log start offset,
|
||||
* log start offset is the offset of a log above which reads are guaranteed to be served
|
||||
* from the disk of the leader broker, when Tiered Storage is not enabled, it behaves the same
|
||||
* as the earliest timestamp
|
||||
*/
|
||||
public static OffsetSpec earliestLocalSpec() {
|
||||
return new EarliestLocalSpec();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to retrieve the offset with the highest offset of data stored in remote storage,
|
||||
* and when Tiered Storage is not enabled, we won't return any offset (i.e. Unknown offset)
|
||||
*/
|
||||
public static OffsetSpec latestTierSpec() {
|
||||
return new LatestTierSpec();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,8 +93,12 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
|
|||
.stream()
|
||||
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
|
||||
|
||||
boolean requireTieredStorageTimestamp = keys
|
||||
.stream()
|
||||
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP || offsetTimestampsByPartition.get(key) == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
|
||||
|
||||
return ListOffsetsRequest.Builder
|
||||
.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
|
||||
.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp, requireTieredStorageTimestamp)
|
||||
.setTargetTimes(new ArrayList<>(topicsByName.values()));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -391,7 +391,7 @@ public class OffsetFetcher {
|
|||
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
|
||||
boolean requireTimestamp) {
|
||||
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
||||
.forConsumer(requireTimestamp, isolationLevel, false)
|
||||
.forConsumer(requireTimestamp, isolationLevel, false, false)
|
||||
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
|
||||
|
||||
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
|
||||
|
|
|
|||
|
|
@ -337,7 +337,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
boolean requireTimestamps,
|
||||
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
|
||||
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
||||
.forConsumer(requireTimestamps, isolationLevel, false)
|
||||
.forConsumer(requireTimestamps, isolationLevel, false, false)
|
||||
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
|
||||
|
||||
log.debug("Creating ListOffset request {} for broker {} to reset positions", builder,
|
||||
|
|
|
|||
|
|
@ -62,9 +62,14 @@ public class ListOffsetsRequest extends AbstractRequest {
|
|||
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
|
||||
}
|
||||
|
||||
public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
|
||||
public static Builder forConsumer(boolean requireTimestamp,
|
||||
IsolationLevel isolationLevel,
|
||||
boolean requireMaxTimestamp,
|
||||
boolean requireTieredStorageTimestamp) {
|
||||
short minVersion = 0;
|
||||
if (requireMaxTimestamp)
|
||||
if (requireTieredStorageTimestamp)
|
||||
minVersion = 9;
|
||||
else if (requireMaxTimestamp)
|
||||
minVersion = 7;
|
||||
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
|
||||
minVersion = 2;
|
||||
|
|
|
|||
|
|
@ -5844,6 +5844,62 @@ public class KafkaAdminClientTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListOffsetsEarliestLocalSpecMinVersion() throws Exception {
|
||||
Node node = new Node(0, "localhost", 8120);
|
||||
List<Node> nodes = Collections.singletonList(node);
|
||||
List<PartitionInfo> pInfos = new ArrayList<>();
|
||||
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
|
||||
final Cluster cluster = new Cluster(
|
||||
"mockClusterId",
|
||||
nodes,
|
||||
pInfos,
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
node);
|
||||
final TopicPartition tp0 = new TopicPartition("foo", 0);
|
||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
|
||||
AdminClientConfig.RETRIES_CONFIG, "2")) {
|
||||
|
||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
|
||||
|
||||
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocalSpec()));
|
||||
|
||||
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
|
||||
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
|
||||
), "no listOffsets request has the expected oldestAllowedVersion");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListOffsetsLatestTierSpecSpecMinVersion() throws Exception {
|
||||
Node node = new Node(0, "localhost", 8120);
|
||||
List<Node> nodes = Collections.singletonList(node);
|
||||
List<PartitionInfo> pInfos = new ArrayList<>();
|
||||
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
|
||||
final Cluster cluster = new Cluster(
|
||||
"mockClusterId",
|
||||
nodes,
|
||||
pInfos,
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet(),
|
||||
node);
|
||||
final TopicPartition tp0 = new TopicPartition("foo", 0);
|
||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
|
||||
AdminClientConfig.RETRIES_CONFIG, "2")) {
|
||||
|
||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
|
||||
|
||||
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTierSpec()));
|
||||
|
||||
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
|
||||
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
|
||||
), "no listOffsets request has the expected oldestAllowedVersion");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
|
||||
return Utils.mkMap(
|
||||
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ public class ListOffsetsRequestTest {
|
|||
new ListOffsetsPartition()
|
||||
.setPartitionIndex(0))));
|
||||
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false)
|
||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false, false)
|
||||
.setTargetTimes(topics)
|
||||
.build(version);
|
||||
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
||||
|
|
@ -101,7 +101,7 @@ public class ListOffsetsRequestTest {
|
|||
new ListOffsetsPartition()
|
||||
.setPartitionIndex(0))));
|
||||
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(topics)
|
||||
.build((short) 0);
|
||||
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
||||
|
|
|
|||
|
|
@ -2347,7 +2347,7 @@ public class RequestResponseTest {
|
|||
.setMaxNumOffsets(10)
|
||||
.setCurrentLeaderEpoch(5)));
|
||||
return ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(singletonList(topic))
|
||||
.build(version);
|
||||
} else if (version == 1) {
|
||||
|
|
@ -2358,7 +2358,7 @@ public class RequestResponseTest {
|
|||
.setTimestamp(1000000L)
|
||||
.setCurrentLeaderEpoch(5)));
|
||||
return ListOffsetsRequest.Builder
|
||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(singletonList(topic))
|
||||
.build(version);
|
||||
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
|
||||
|
|
@ -2371,7 +2371,7 @@ public class RequestResponseTest {
|
|||
.setName("test")
|
||||
.setPartitions(singletonList(partition));
|
||||
return ListOffsetsRequest.Builder
|
||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false)
|
||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false, false)
|
||||
.setTargetTimes(singletonList(topic))
|
||||
.build(version);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -287,7 +287,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
}
|
||||
|
||||
private def createListOffsetsRequest = {
|
||||
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
|
||||
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
|
||||
List(new ListOffsetsTopic()
|
||||
.setName(tp.topic)
|
||||
.setPartitions(List(new ListOffsetsPartition()
|
||||
|
|
|
|||
|
|
@ -4000,7 +4000,7 @@ class KafkaApisTest extends Logging {
|
|||
.setPartitionIndex(tp.partition)
|
||||
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
|
||||
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
|
||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
|
||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false)
|
||||
.setTargetTimes(targetTimes).build()
|
||||
val request = buildRequest(listOffsetRequest)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
|
|
@ -6151,7 +6151,7 @@ class KafkaApisTest extends Logging {
|
|||
.setPartitions(List(new ListOffsetsPartition()
|
||||
.setPartitionIndex(tp.partition)
|
||||
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
|
||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false)
|
||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false)
|
||||
.setTargetTimes(targetTimes).build()
|
||||
val request = buildRequest(listOffsetRequest)
|
||||
kafkaApis = createKafkaApis()
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
.setCurrentLeaderEpoch(0)).asJava)).asJava
|
||||
|
||||
val consumerRequest = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(targetTimes)
|
||||
.build()
|
||||
|
||||
|
|
@ -94,15 +94,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testListOffsetsMaxTimeStampOldestVersion(quorum: String): Unit = {
|
||||
def testListOffsetsRequestOldestVersion(): Unit = {
|
||||
val consumerRequestBuilder = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
|
||||
val requireTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
|
||||
val requestCommittedRequestBuilder = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_COMMITTED, false, false)
|
||||
|
||||
val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false)
|
||||
|
||||
val requireTieredStorageTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true)
|
||||
|
||||
assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
|
||||
assertEquals(1.toShort, requireTimestampRequestBuilder.oldestAllowedVersion())
|
||||
assertEquals(2.toShort, requestCommittedRequestBuilder.oldestAllowedVersion())
|
||||
assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
|
||||
assertEquals(9.toShort, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion())
|
||||
}
|
||||
|
||||
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
|
||||
|
|
@ -115,7 +127,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
.setName(topic)
|
||||
.setPartitions(List(listOffsetPartition).asJava)).asJava
|
||||
val request = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(targetTimes)
|
||||
.build()
|
||||
assertResponseError(error, brokerId, request)
|
||||
|
|
@ -159,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
|||
.setTimestamp(timestamp)).asJava)).asJava
|
||||
|
||||
val builder = ListOffsetsRequest.Builder
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(targetTimes)
|
||||
|
||||
val request = if (version == -1) builder.build() else builder.build(version)
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class LogOffsetTest extends BaseRequestTest {
|
|||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
|
||||
val topicPartition = new TopicPartition("foo", 0)
|
||||
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
|
||||
val response = sendListOffsetsRequest(request)
|
||||
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
|
||||
|
|
|
|||
|
|
@ -288,7 +288,7 @@ class RequestQuotaTest extends BaseRequestTest {
|
|||
.setPartitionIndex(tp.partition)
|
||||
.setTimestamp(0L)
|
||||
.setCurrentLeaderEpoch(15)).asJava)
|
||||
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.setTargetTimes(List(topic).asJava)
|
||||
|
||||
case ApiKeys.LEADER_AND_ISR =>
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ public class ListOffsetRequestBenchmark {
|
|||
}
|
||||
}
|
||||
|
||||
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
|
||||
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
||||
.build(ApiKeys.LIST_OFFSETS.latestVersion());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue