KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP (#16783)

This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add integration tests.

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
This commit is contained in:
Kuan-Po Tseng 2024-08-05 10:41:14 +08:00 committed by GitHub
parent b3fd9a5a95
commit 84add30ea5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 158 additions and 23 deletions

View File

@ -2141,6 +2141,7 @@ project(':tools') {
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation project(':storage').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension

View File

@ -284,6 +284,8 @@
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
<allow pkg="org.apache.kafka.server.log.remote.storage" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />

View File

@ -4862,7 +4862,7 @@ public class KafkaAdminClient extends AdminClient {
return ListOffsetsRequest.MAX_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
} else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) {
return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;

View File

@ -27,7 +27,7 @@ public class OffsetSpec {
public static class LatestSpec extends OffsetSpec { }
public static class MaxTimestampSpec extends OffsetSpec { }
public static class EarliestLocalSpec extends OffsetSpec { }
public static class LatestTierSpec extends OffsetSpec { }
public static class LatestTieredSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@ -73,20 +73,22 @@ public class OffsetSpec {
}
/**
* Used to retrieve the offset with the local log start offset,
* log start offset is the offset of a log above which reads are guaranteed to be served
* from the disk of the leader broker, when Tiered Storage is not enabled, it behaves the same
* as the earliest timestamp
* Used to retrieve the local log start offset.
* Local log start offset is the offset of a log above which reads
* are guaranteed to be served from the disk of the leader broker.
* <br/>
* Note: When tiered Storage is not enabled, it behaves the same as retrieving the earliest timestamp offset.
*/
public static OffsetSpec earliestLocalSpec() {
public static OffsetSpec earliestLocal() {
return new EarliestLocalSpec();
}
/**
* Used to retrieve the offset with the highest offset of data stored in remote storage,
* and when Tiered Storage is not enabled, we won't return any offset (i.e. Unknown offset)
* Used to retrieve the highest offset of data stored in remote storage.
* <br/>
* Note: When tiered storage is not enabled, we will return unknown offset.
*/
public static OffsetSpec latestTierSpec() {
return new LatestTierSpec();
public static OffsetSpec latestTiered() {
return new LatestTieredSpec();
}
}

View File

@ -5864,7 +5864,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocalSpec()));
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocal()));
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
@ -5892,7 +5892,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTierSpec()));
env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTiered()));
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9

View File

@ -132,7 +132,7 @@ public class GetOffsetShell {
.ofType(String.class);
timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
.withRequiredArg()
.describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
.describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered")
.ofType(String.class)
.defaultsTo("latest");
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
@ -275,7 +275,8 @@ public class GetOffsetShell {
}
}
private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
// visible for tseting
static OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
switch (listOffsetsTimestamp) {
case "earliest":
return OffsetSpec.earliest();
@ -283,6 +284,10 @@ public class GetOffsetShell {
return OffsetSpec.latest();
case "max-timestamp":
return OffsetSpec.maxTimestamp();
case "earliest-local":
return OffsetSpec.earliestLocal();
case "latest-tiered":
return OffsetSpec.latestTiered();
default:
long timestamp;
@ -290,7 +295,7 @@ public class GetOffsetShell {
timestamp = Long.parseLong(listOffsetsTimestamp);
} catch (NumberFormatException e) {
throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
"Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
"Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp");
}
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
@ -299,6 +304,10 @@ public class GetOffsetShell {
return OffsetSpec.latest();
} else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
return OffsetSpec.maxTimestamp();
} else if (timestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
return OffsetSpec.earliestLocal();
} else if (timestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
return OffsetSpec.latestTiered();
} else {
return OffsetSpec.forTimestamp(timestamp);
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.server.util.TopicPartitionFilter;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -243,6 +244,13 @@ public class GetOffsetShellParsingTest {
assertThrows(TerseException.class, () -> GetOffsetShell.execute("--bootstrap-server", "localhost:9092", "--time", "invalid"));
}
@Test
public void testInvalidOffset() {
assertEquals("Malformed time argument foo. " +
"Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp",
assertThrows(TerseException.class, () -> GetOffsetShell.parseOffsetSpec("foo")).getMessage());
}
private TopicPartition getTopicPartition(String topic, Integer partition) {
return new TopicPartition(topic, partition);
}

View File

@ -17,8 +17,10 @@
package org.apache.kafka.tools;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
@ -31,23 +33,36 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.KRAFT;
import static kafka.test.annotation.Type.ZK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -70,15 +85,41 @@ public class GetOffsetShellTest {
return "topic" + i;
}
private String getRemoteLogStorageEnabledTopicName(int i) {
return "topicRLS" + i;
}
private void setUp() {
setupTopics(this::getTopicName, Collections.emptyMap());
sendProducerRecords(this::getTopicName);
}
private void setUpRemoteLogTopics() {
// In this method, we'll create 4 topics and produce records to the log like this:
// topicRLS1 -> 1 segment
// topicRLS2 -> 2 segments (1 local log segment + 1 segment in the remote storage)
// topicRLS3 -> 3 segments (1 local log segment + 2 segments in the remote storage)
// topicRLS4 -> 4 segments (1 local log segment + 3 segments in the remote storage)
Map<String, String> rlsConfigs = new HashMap<>();
rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
}
private void setupTopics(Function<Integer, String> topicName, Map<String, String> configs) {
try (Admin admin = cluster.createAdminClient()) {
List<NewTopic> topics = new ArrayList<>();
IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
IntStream.range(0, topicCount + 1).forEach(i ->
topics.add(new NewTopic(topicName.apply(i), i, (short) 1).configs(configs)));
admin.createTopics(topics);
}
}
private void sendProducerRecords(Function<Integer, String> topicName) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@ -87,14 +128,33 @@ public class GetOffsetShellTest {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
IntStream.range(0, topicCount + 1)
.forEach(i -> IntStream.range(0, i * i)
.forEach(msgCount -> {
assertDoesNotThrow(() -> producer.send(
new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)).get());
})
);
.forEach(msgCount -> assertDoesNotThrow(() -> producer.send(
new ProducerRecord<>(topicName.apply(i), msgCount % i, null, "val" + msgCount)).get())));
}
}
private static List<ClusterConfig> withRemoteStorage() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, "1");
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, "1");
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "1000");
serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, "1000");
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "EXTERNAL");
return Collections.singletonList(
// we set REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP to EXTERNAL, so we need to
// align listener name here as KafkaClusterTestKit (KRAFT/CO_KRAFT) the default
// broker listener name is EXTERNAL while in ZK it is PLAINTEXT
ClusterConfig.defaultBuilder()
.setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setServerProperties(serverProperties)
.setListenerName("EXTERNAL")
.build());
}
private void createConsumerAndPoll() {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
@ -274,6 +334,59 @@ public class GetOffsetShellTest {
}
}
@ClusterTemplate("withRemoteStorage")
public void testGetOffsetsByEarliestLocalSpec() throws InterruptedException {
setUp();
setUpRemoteLogTopics();
for (String time : new String[] {"-4", "earliest-local"}) {
// test topics disable remote log storage
// as remote log disabled, broker return the same result as earliest offset
TestUtils.waitForCondition(() ->
Arrays.asList(
new Row("topic1", 0, 0L),
new Row("topic2", 0, 0L),
new Row("topic3", 0, 0L),
new Row("topic4", 0, 0L))
.equals(executeAndParse("--topic-partitions", "topic\\d+.*:0", "--time", time)),
"testGetOffsetsByEarliestLocalSpec get topics with remote log disabled result not match");
// test topics enable remote log storage
TestUtils.waitForCondition(() ->
Arrays.asList(
new Row("topicRLS1", 0, 0L),
new Row("topicRLS2", 0, 1L),
new Row("topicRLS3", 0, 2L),
new Row("topicRLS4", 0, 3L))
.equals(executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)),
"testGetOffsetsByEarliestLocalSpec get topics with remote log enabled result not match");
}
}
@ClusterTemplate("withRemoteStorage")
public void testGetOffsetsByLatestTieredSpec() throws InterruptedException {
setUp();
setUpRemoteLogTopics();
for (String time : new String[] {"-5", "latest-tiered"}) {
// test topics disable remote log storage
// as remote log not enabled, broker return unknown offset for each topic partition and these
// unknown offsets are ignored by GetOffsetShell hence we have empty result here.
assertEquals(Collections.emptyList(),
executeAndParse("--topic-partitions", "topic\\d+:0", "--time", time));
// test topics enable remote log storage
// topicRLS1 has no result because there's no log segments being uploaded to the remote storage
TestUtils.waitForCondition(() ->
Arrays.asList(
new Row("topicRLS2", 0, 0L),
new Row("topicRLS3", 0, 1L),
new Row("topicRLS4", 0, 2L))
.equals(executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)),
"testGetOffsetsByLatestTieredSpec result not match");
}
}
@ClusterTest
public void testGetOffsetsByTimestamp() {
setUp();