mirror of https://github.com/apache/kafka.git
KAFKA-17331 Set correct version for EarliestLocalSpec and LatestTieredSpec (#16876)
Add the version check to client side when building ListOffsetRequest for the specific timestamp: 1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP) 2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP) Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
3d436f52bf
commit
d67c18b4ae
|
@ -49,6 +49,7 @@
|
||||||
|
|
||||||
<subpackage name="common">
|
<subpackage name="common">
|
||||||
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
|
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
|
||||||
|
<allow class="org.apache.kafka.clients.NodeApiVersions" exact-match="true" />
|
||||||
<allow class="org.apache.kafka.common.message.ApiMessageType" exact-match="true" />
|
<allow class="org.apache.kafka.common.message.ApiMessageType" exact-match="true" />
|
||||||
<disallow pkg="org.apache.kafka.clients" />
|
<disallow pkg="org.apache.kafka.clients" />
|
||||||
<allow pkg="org.apache.kafka.common" exact-match="true" />
|
<allow pkg="org.apache.kafka.common" exact-match="true" />
|
||||||
|
|
|
@ -93,13 +93,20 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
|
||||||
.stream()
|
.stream()
|
||||||
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
|
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
|
||||||
|
|
||||||
|
boolean requireEarliestLocalTimestamp = keys
|
||||||
|
.stream()
|
||||||
|
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
|
||||||
|
|
||||||
boolean requireTieredStorageTimestamp = keys
|
boolean requireTieredStorageTimestamp = keys
|
||||||
.stream()
|
.stream()
|
||||||
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP || offsetTimestampsByPartition.get(key) == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
|
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
|
||||||
|
|
||||||
return ListOffsetsRequest.Builder
|
return ListOffsetsRequest.Builder.forConsumer(true,
|
||||||
.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp, requireTieredStorageTimestamp)
|
options.isolationLevel(),
|
||||||
.setTargetTimes(new ArrayList<>(topicsByName.values()));
|
supportsMaxTimestamp,
|
||||||
|
requireEarliestLocalTimestamp,
|
||||||
|
requireTieredStorageTimestamp)
|
||||||
|
.setTargetTimes(new ArrayList<>(topicsByName.values()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -391,7 +391,7 @@ public class OffsetFetcher {
|
||||||
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
|
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
|
||||||
boolean requireTimestamp) {
|
boolean requireTimestamp) {
|
||||||
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(requireTimestamp, isolationLevel, false, false)
|
.forConsumer(requireTimestamp, isolationLevel)
|
||||||
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
|
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
|
||||||
|
|
||||||
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
|
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
|
||||||
|
|
|
@ -337,7 +337,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
||||||
boolean requireTimestamps,
|
boolean requireTimestamps,
|
||||||
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
|
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
|
||||||
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(requireTimestamps, isolationLevel, false, false)
|
.forConsumer(requireTimestamps, isolationLevel)
|
||||||
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
|
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
|
||||||
|
|
||||||
log.debug("Creating ListOffset request {} for broker {} to reset positions", builder,
|
log.debug("Creating ListOffset request {} for broker {} to reset positions", builder,
|
||||||
|
|
|
@ -58,17 +58,21 @@ public class ListOffsetsRequest extends AbstractRequest {
|
||||||
public static class Builder extends AbstractRequest.Builder<ListOffsetsRequest> {
|
public static class Builder extends AbstractRequest.Builder<ListOffsetsRequest> {
|
||||||
private final ListOffsetsRequestData data;
|
private final ListOffsetsRequestData data;
|
||||||
|
|
||||||
public static Builder forReplica(short allowedVersion, int replicaId) {
|
public static Builder forConsumer(boolean requireTimestamp,
|
||||||
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
|
IsolationLevel isolationLevel) {
|
||||||
|
return forConsumer(requireTimestamp, isolationLevel, false, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Builder forConsumer(boolean requireTimestamp,
|
public static Builder forConsumer(boolean requireTimestamp,
|
||||||
IsolationLevel isolationLevel,
|
IsolationLevel isolationLevel,
|
||||||
boolean requireMaxTimestamp,
|
boolean requireMaxTimestamp,
|
||||||
|
boolean requireEarliestLocalTimestamp,
|
||||||
boolean requireTieredStorageTimestamp) {
|
boolean requireTieredStorageTimestamp) {
|
||||||
short minVersion = 0;
|
short minVersion = 0;
|
||||||
if (requireTieredStorageTimestamp)
|
if (requireTieredStorageTimestamp)
|
||||||
minVersion = 9;
|
minVersion = 9;
|
||||||
|
else if (requireEarliestLocalTimestamp)
|
||||||
|
minVersion = 8;
|
||||||
else if (requireMaxTimestamp)
|
else if (requireMaxTimestamp)
|
||||||
minVersion = 7;
|
minVersion = 7;
|
||||||
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
|
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
|
||||||
|
@ -78,6 +82,10 @@ public class ListOffsetsRequest extends AbstractRequest {
|
||||||
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
|
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Builder forReplica(short allowedVersion, int replicaId) {
|
||||||
|
return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
|
||||||
|
}
|
||||||
|
|
||||||
private Builder(short oldestAllowedVersion,
|
private Builder(short oldestAllowedVersion,
|
||||||
short latestAllowedVersion,
|
short latestAllowedVersion,
|
||||||
int replicaId,
|
int replicaId,
|
||||||
|
|
|
@ -6284,7 +6284,7 @@ public class KafkaAdminClientTest {
|
||||||
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocal()));
|
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocal()));
|
||||||
|
|
||||||
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
|
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
|
||||||
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
|
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 8
|
||||||
), "no listOffsets request has the expected oldestAllowedVersion");
|
), "no listOffsets request has the expected oldestAllowedVersion");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,8 @@ public final class ListOffsetsHandlerTest {
|
||||||
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
|
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
|
||||||
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
|
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
|
||||||
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
|
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
|
||||||
|
private final TopicPartition t2p0 = new TopicPartition("t2", 0);
|
||||||
|
private final TopicPartition t2p1 = new TopicPartition("t2", 1);
|
||||||
|
|
||||||
private final Node node = new Node(1, "host", 1234);
|
private final Node node = new Node(1, "host", 1234);
|
||||||
|
|
||||||
|
@ -69,6 +71,8 @@ public final class ListOffsetsHandlerTest {
|
||||||
put(t0p1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
|
put(t0p1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
|
||||||
put(t1p0, 123L);
|
put(t1p0, 123L);
|
||||||
put(t1p1, ListOffsetsRequest.MAX_TIMESTAMP);
|
put(t1p1, ListOffsetsRequest.MAX_TIMESTAMP);
|
||||||
|
put(t2p0, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
|
||||||
|
put(t2p1, ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -96,14 +100,14 @@ public final class ListOffsetsHandlerTest {
|
||||||
ListOffsetsRequest request =
|
ListOffsetsRequest request =
|
||||||
handler.buildBatchedRequest(node.id(), offsetTimestampsByPartition.keySet()).build();
|
handler.buildBatchedRequest(node.id(), offsetTimestampsByPartition.keySet()).build();
|
||||||
List<ListOffsetsTopic> topics = request.topics();
|
List<ListOffsetsTopic> topics = request.topics();
|
||||||
assertEquals(2, topics.size());
|
assertEquals(3, topics.size());
|
||||||
Map<TopicPartition, ListOffsetsPartition> partitions = new HashMap<>();
|
Map<TopicPartition, ListOffsetsPartition> partitions = new HashMap<>();
|
||||||
for (ListOffsetsTopic topic : topics) {
|
for (ListOffsetsTopic topic : topics) {
|
||||||
for (ListOffsetsPartition partition : topic.partitions()) {
|
for (ListOffsetsPartition partition : topic.partitions()) {
|
||||||
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partition);
|
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partition);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertEquals(4, partitions.size());
|
assertEquals(6, partitions.size());
|
||||||
for (Map.Entry<TopicPartition, ListOffsetsPartition> entry : partitions.entrySet()) {
|
for (Map.Entry<TopicPartition, ListOffsetsPartition> entry : partitions.entrySet()) {
|
||||||
assertExpectedTimestamp(entry.getKey(), entry.getValue().timestamp());
|
assertExpectedTimestamp(entry.getKey(), entry.getValue().timestamp());
|
||||||
}
|
}
|
||||||
|
@ -126,6 +130,12 @@ public final class ListOffsetsHandlerTest {
|
||||||
|
|
||||||
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0, t1p1));
|
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0, t1p1));
|
||||||
assertEquals(7, builder.oldestAllowedVersion());
|
assertEquals(7, builder.oldestAllowedVersion());
|
||||||
|
|
||||||
|
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0, t1p1, t2p0));
|
||||||
|
assertEquals(8, builder.oldestAllowedVersion());
|
||||||
|
|
||||||
|
builder = readCommittedHandler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1, t1p0, t1p1, t2p0, t2p1));
|
||||||
|
assertEquals(9, builder.oldestAllowedVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -16,8 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.common.requests;
|
package org.apache.kafka.common.requests;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.NodeApiVersions;
|
||||||
import org.apache.kafka.common.IsolationLevel;
|
import org.apache.kafka.common.IsolationLevel;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
|
import org.apache.kafka.common.message.ApiVersionsResponseData;
|
||||||
import org.apache.kafka.common.message.ListOffsetsRequestData;
|
import org.apache.kafka.common.message.ListOffsetsRequestData;
|
||||||
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
|
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
|
||||||
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
|
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
|
||||||
|
@ -36,11 +39,16 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.requests.ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
|
||||||
|
import static org.apache.kafka.common.requests.ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class ListOffsetsRequestTest {
|
public class ListOffsetsRequestTest {
|
||||||
|
|
||||||
|
private final NodeApiVersions versionInfo = new NodeApiVersions(new ApiVersionsResponseData.ApiVersionCollection(), Collections.emptyList(), false);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDuplicatePartitions() {
|
public void testDuplicatePartitions() {
|
||||||
List<ListOffsetsTopic> topics = Collections.singletonList(
|
List<ListOffsetsTopic> topics = Collections.singletonList(
|
||||||
|
@ -68,7 +76,7 @@ public class ListOffsetsRequestTest {
|
||||||
new ListOffsetsPartition()
|
new ListOffsetsPartition()
|
||||||
.setPartitionIndex(0))));
|
.setPartitionIndex(0))));
|
||||||
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
||||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false, false)
|
.forConsumer(true, IsolationLevel.READ_COMMITTED)
|
||||||
.setTargetTimes(topics)
|
.setTargetTimes(topics)
|
||||||
.build(version);
|
.build(version);
|
||||||
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
||||||
|
@ -101,7 +109,7 @@ public class ListOffsetsRequestTest {
|
||||||
new ListOffsetsPartition()
|
new ListOffsetsPartition()
|
||||||
.setPartitionIndex(0))));
|
.setPartitionIndex(0))));
|
||||||
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
ListOffsetsRequest request = ListOffsetsRequest.Builder
|
||||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(topics)
|
.setTargetTimes(topics)
|
||||||
.build((short) 0);
|
.build((short) 0);
|
||||||
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
|
||||||
|
@ -146,4 +154,34 @@ public class ListOffsetsRequestTest {
|
||||||
assertTrue(topic.partitions().contains(lop1));
|
assertTrue(topic.partitions().contains(lop1));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
@Test
|
||||||
|
public void testCheckEarliestLocalTimestampVersion() {
|
||||||
|
int maxVersion = ApiKeys.LIST_OFFSETS.latestVersion();
|
||||||
|
for (int i = 0; i <= maxVersion; i++) {
|
||||||
|
testUnsupportedVersion(i, EARLIEST_LOCAL_TIMESTAMP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckLatestTieredTimestampVersion() {
|
||||||
|
int maxVersion = ApiKeys.LIST_OFFSETS.latestVersion();
|
||||||
|
for (int i = 0; i <= maxVersion; i++) {
|
||||||
|
testUnsupportedVersion(i, LATEST_TIERED_TIMESTAMP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testUnsupportedVersion(int version, long timestamp) {
|
||||||
|
if (timestamp == EARLIEST_LOCAL_TIMESTAMP && version < 8) {
|
||||||
|
assertUnsupportedVersion(version);
|
||||||
|
} else if (timestamp == LATEST_TIERED_TIMESTAMP && version < 9) {
|
||||||
|
assertUnsupportedVersion(version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertUnsupportedVersion(int version) {
|
||||||
|
ApiKeys apiKey = ApiKeys.LIST_OFFSETS;
|
||||||
|
UnsupportedVersionException exception = assertThrows(UnsupportedVersionException.class,
|
||||||
|
() -> versionInfo.latestUsableVersion(apiKey, (short) version, apiKey.latestVersion()));
|
||||||
|
assertEquals("The node does not support " + apiKey, exception.getMessage());
|
||||||
|
}
|
||||||
|
}
|
|
@ -2355,7 +2355,7 @@ public class RequestResponseTest {
|
||||||
.setMaxNumOffsets(10)
|
.setMaxNumOffsets(10)
|
||||||
.setCurrentLeaderEpoch(5)));
|
.setCurrentLeaderEpoch(5)));
|
||||||
return ListOffsetsRequest.Builder
|
return ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(singletonList(topic))
|
.setTargetTimes(singletonList(topic))
|
||||||
.build(version);
|
.build(version);
|
||||||
} else if (version == 1) {
|
} else if (version == 1) {
|
||||||
|
@ -2366,7 +2366,7 @@ public class RequestResponseTest {
|
||||||
.setTimestamp(1000000L)
|
.setTimestamp(1000000L)
|
||||||
.setCurrentLeaderEpoch(5)));
|
.setCurrentLeaderEpoch(5)));
|
||||||
return ListOffsetsRequest.Builder
|
return ListOffsetsRequest.Builder
|
||||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(singletonList(topic))
|
.setTargetTimes(singletonList(topic))
|
||||||
.build(version);
|
.build(version);
|
||||||
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
|
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
|
||||||
|
@ -2379,7 +2379,7 @@ public class RequestResponseTest {
|
||||||
.setName("test")
|
.setName("test")
|
||||||
.setPartitions(singletonList(partition));
|
.setPartitions(singletonList(partition));
|
||||||
return ListOffsetsRequest.Builder
|
return ListOffsetsRequest.Builder
|
||||||
.forConsumer(true, IsolationLevel.READ_COMMITTED, false, false)
|
.forConsumer(true, IsolationLevel.READ_COMMITTED)
|
||||||
.setTargetTimes(singletonList(topic))
|
.setTargetTimes(singletonList(topic))
|
||||||
.build(version);
|
.build(version);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -300,8 +300,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createListOffsetsRequest = {
|
private def createListOffsetsRequest = {
|
||||||
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
|
requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
List(new ListOffsetsTopic()
|
.setTargetTimes(List(new ListOffsetsTopic()
|
||||||
.setName(tp.topic)
|
.setName(tp.topic)
|
||||||
.setPartitions(List(new ListOffsetsPartition()
|
.setPartitions(List(new ListOffsetsPartition()
|
||||||
.setPartitionIndex(tp.partition)
|
.setPartitionIndex(tp.partition)
|
||||||
|
|
|
@ -4101,7 +4101,7 @@ class KafkaApisTest extends Logging {
|
||||||
.setPartitionIndex(tp.partition)
|
.setPartitionIndex(tp.partition)
|
||||||
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
|
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
|
||||||
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
|
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
|
||||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false)
|
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
|
||||||
.setTargetTimes(targetTimes).build()
|
.setTargetTimes(targetTimes).build()
|
||||||
val request = buildRequest(listOffsetRequest)
|
val request = buildRequest(listOffsetRequest)
|
||||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||||
|
@ -10112,7 +10112,7 @@ class KafkaApisTest extends Logging {
|
||||||
.setPartitions(List(new ListOffsetsPartition()
|
.setPartitions(List(new ListOffsetsPartition()
|
||||||
.setPartitionIndex(tp.partition)
|
.setPartitionIndex(tp.partition)
|
||||||
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
|
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
|
||||||
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false)
|
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
|
||||||
.setTargetTimes(targetTimes).build()
|
.setTargetTimes(targetTimes).build()
|
||||||
val request = buildRequest(listOffsetRequest)
|
val request = buildRequest(listOffsetRequest)
|
||||||
kafkaApis = createKafkaApis()
|
kafkaApis = createKafkaApis()
|
||||||
|
|
|
@ -54,7 +54,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
||||||
.setCurrentLeaderEpoch(0)).asJava)).asJava
|
.setCurrentLeaderEpoch(0)).asJava)).asJava
|
||||||
|
|
||||||
val consumerRequest = ListOffsetsRequest.Builder
|
val consumerRequest = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(targetTimes)
|
.setTargetTimes(targetTimes)
|
||||||
.build()
|
.build()
|
||||||
|
|
||||||
|
@ -96,24 +96,28 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
||||||
@ValueSource(strings = Array("zk", "kraft"))
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
def testListOffsetsRequestOldestVersion(): Unit = {
|
def testListOffsetsRequestOldestVersion(): Unit = {
|
||||||
val consumerRequestBuilder = ListOffsetsRequest.Builder
|
val consumerRequestBuilder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
|
|
||||||
val requireTimestampRequestBuilder = ListOffsetsRequest.Builder
|
val requireTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
|
||||||
|
|
||||||
val requestCommittedRequestBuilder = ListOffsetsRequest.Builder
|
val requestCommittedRequestBuilder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_COMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_COMMITTED)
|
||||||
|
|
||||||
val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
|
val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false, false)
|
||||||
|
|
||||||
|
val requireEarliestLocalTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||||
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true, false)
|
||||||
|
|
||||||
val requireTieredStorageTimestampRequestBuilder = ListOffsetsRequest.Builder
|
val requireTieredStorageTimestampRequestBuilder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, true)
|
||||||
|
|
||||||
assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
|
assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
|
||||||
assertEquals(1.toShort, requireTimestampRequestBuilder.oldestAllowedVersion())
|
assertEquals(1.toShort, requireTimestampRequestBuilder.oldestAllowedVersion())
|
||||||
assertEquals(2.toShort, requestCommittedRequestBuilder.oldestAllowedVersion())
|
assertEquals(2.toShort, requestCommittedRequestBuilder.oldestAllowedVersion())
|
||||||
assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
|
assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
|
||||||
|
assertEquals(8.toShort, requireEarliestLocalTimestampRequestBuilder.oldestAllowedVersion())
|
||||||
assertEquals(9.toShort, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion())
|
assertEquals(9.toShort, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +131,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
||||||
.setName(topic)
|
.setName(topic)
|
||||||
.setPartitions(List(listOffsetPartition).asJava)).asJava
|
.setPartitions(List(listOffsetPartition).asJava)).asJava
|
||||||
val request = ListOffsetsRequest.Builder
|
val request = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(targetTimes)
|
.setTargetTimes(targetTimes)
|
||||||
.build()
|
.build()
|
||||||
assertResponseError(error, brokerId, request)
|
assertResponseError(error, brokerId, request)
|
||||||
|
@ -171,7 +175,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
|
||||||
.setTimestamp(timestamp)).asJava)).asJava
|
.setTimestamp(timestamp)).asJava)).asJava
|
||||||
|
|
||||||
val builder = ListOffsetsRequest.Builder
|
val builder = ListOffsetsRequest.Builder
|
||||||
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(targetTimes)
|
.setTargetTimes(targetTimes)
|
||||||
|
|
||||||
val request = if (version == -1) builder.build() else builder.build(version)
|
val request = if (version == -1) builder.build() else builder.build(version)
|
||||||
|
|
|
@ -60,7 +60,7 @@ class LogOffsetTest extends BaseRequestTest {
|
||||||
@ValueSource(strings = Array("zk", "kraft"))
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
|
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
|
||||||
val topicPartition = new TopicPartition("foo", 0)
|
val topicPartition = new TopicPartition("foo", 0)
|
||||||
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
|
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
|
||||||
val response = sendListOffsetsRequest(request)
|
val response = sendListOffsetsRequest(request)
|
||||||
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
|
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode)
|
||||||
|
|
|
@ -288,7 +288,7 @@ class RequestQuotaTest extends BaseRequestTest {
|
||||||
.setPartitionIndex(tp.partition)
|
.setPartitionIndex(tp.partition)
|
||||||
.setTimestamp(0L)
|
.setTimestamp(0L)
|
||||||
.setCurrentLeaderEpoch(15)).asJava)
|
.setCurrentLeaderEpoch(15)).asJava)
|
||||||
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.setTargetTimes(List(topic).asJava)
|
.setTargetTimes(List(topic).asJava)
|
||||||
|
|
||||||
case ApiKeys.LEADER_AND_ISR =>
|
case ApiKeys.LEADER_AND_ISR =>
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class ListOffsetRequestBenchmark {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
|
this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
|
||||||
.build(ApiKeys.LIST_OFFSETS.latestVersion());
|
.build(ApiKeys.LIST_OFFSETS.latestVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue