diff --git a/build.gradle b/build.gradle
index 51f9659e587..becac56bf88 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2141,6 +2141,7 @@ project(':tools') {
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
+ testImplementation project(':storage').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a5784ef935c..3f8212f9976 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -284,6 +284,8 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8eb7fb4e8c0..2f195489add 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4862,7 +4862,7 @@ public class KafkaAdminClient extends AdminClient {
return ListOffsetsRequest.MAX_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
- } else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
+ } else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) {
return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 5b2fbb3e2e9..68f94cc493e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -27,7 +27,7 @@ public class OffsetSpec {
public static class LatestSpec extends OffsetSpec { }
public static class MaxTimestampSpec extends OffsetSpec { }
public static class EarliestLocalSpec extends OffsetSpec { }
- public static class LatestTierSpec extends OffsetSpec { }
+ public static class LatestTieredSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@@ -73,20 +73,22 @@ public class OffsetSpec {
}
/**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ *
+ * Note: When tiered Storage is not enabled, it behaves the same as retrieving the earliest timestamp offset.
*/
- public static OffsetSpec earliestLocalSpec() {
+ public static OffsetSpec earliestLocal() {
return new EarliestLocalSpec();
}
/**
- * Used to retrieve the offset with the highest offset of data stored in remote storage,
- * and when Tiered Storage is not enabled, we won't return any offset (i.e. Unknown offset)
+ * Used to retrieve the highest offset of data stored in remote storage.
+ *
+ * Note: When tiered storage is not enabled, we will return unknown offset.
*/
- public static OffsetSpec latestTierSpec() {
- return new LatestTierSpec();
+ public static OffsetSpec latestTiered() {
+ return new LatestTieredSpec();
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8d70e60fc05..dc74a178442 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5864,7 +5864,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
- env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocalSpec()));
+ env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocal()));
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
@@ -5892,7 +5892,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
- env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTierSpec()));
+ env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTiered()));
TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index 4ba0f6c3e3c..60b78acd22b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -132,7 +132,7 @@ public class GetOffsetShell {
.ofType(String.class);
timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
.withRequiredArg()
- .describedAs(" / -1 or latest / -2 or earliest / -3 or max-timestamp")
+ .describedAs(" / -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered")
.ofType(String.class)
.defaultsTo("latest");
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
@@ -275,7 +275,8 @@ public class GetOffsetShell {
}
}
- private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
+ // visible for tseting
+ static OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
switch (listOffsetsTimestamp) {
case "earliest":
return OffsetSpec.earliest();
@@ -283,6 +284,10 @@ public class GetOffsetShell {
return OffsetSpec.latest();
case "max-timestamp":
return OffsetSpec.maxTimestamp();
+ case "earliest-local":
+ return OffsetSpec.earliestLocal();
+ case "latest-tiered":
+ return OffsetSpec.latestTiered();
default:
long timestamp;
@@ -290,7 +295,7 @@ public class GetOffsetShell {
timestamp = Long.parseLong(listOffsetsTimestamp);
} catch (NumberFormatException e) {
throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
- "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
+ "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp");
}
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
@@ -299,6 +304,10 @@ public class GetOffsetShell {
return OffsetSpec.latest();
} else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
return OffsetSpec.maxTimestamp();
+ } else if (timestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
+ return OffsetSpec.earliestLocal();
+ } else if (timestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+ return OffsetSpec.latestTiered();
} else {
return OffsetSpec.forTimestamp(timestamp);
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
index 3c4ef0894f7..9e81c23f309 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.server.util.TopicPartitionFilter;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -243,6 +244,13 @@ public class GetOffsetShellParsingTest {
assertThrows(TerseException.class, () -> GetOffsetShell.execute("--bootstrap-server", "localhost:9092", "--time", "invalid"));
}
+ @Test
+ public void testInvalidOffset() {
+ assertEquals("Malformed time argument foo. " +
+ "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp",
+ assertThrows(TerseException.class, () -> GetOffsetShell.parseOffsetSpec("foo")).getMessage());
+ }
+
private TopicPartition getTopicPartition(String topic, Integer partition) {
return new TopicPartition(topic, partition);
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 2d588c60257..95007d7bf85 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -17,8 +17,10 @@
package org.apache.kafka.tools;
+import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
@@ -31,23 +33,36 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -70,15 +85,41 @@ public class GetOffsetShellTest {
return "topic" + i;
}
+ private String getRemoteLogStorageEnabledTopicName(int i) {
+ return "topicRLS" + i;
+ }
+
private void setUp() {
+ setupTopics(this::getTopicName, Collections.emptyMap());
+ sendProducerRecords(this::getTopicName);
+ }
+
+ private void setUpRemoteLogTopics() {
+ // In this method, we'll create 4 topics and produce records to the log like this:
+ // topicRLS1 -> 1 segment
+ // topicRLS2 -> 2 segments (1 local log segment + 1 segment in the remote storage)
+ // topicRLS3 -> 3 segments (1 local log segment + 2 segments in the remote storage)
+ // topicRLS4 -> 4 segments (1 local log segment + 3 segments in the remote storage)
+ Map rlsConfigs = new HashMap<>();
+ rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+ rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+ rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+ setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+ sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+ }
+
+ private void setupTopics(Function topicName, Map configs) {
try (Admin admin = cluster.createAdminClient()) {
List topics = new ArrayList<>();
- IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
+ IntStream.range(0, topicCount + 1).forEach(i ->
+ topics.add(new NewTopic(topicName.apply(i), i, (short) 1).configs(configs)));
admin.createTopics(topics);
}
+ }
+ private void sendProducerRecords(Function topicName) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@@ -86,15 +127,34 @@ public class GetOffsetShellTest {
try (KafkaProducer producer = new KafkaProducer<>(props)) {
IntStream.range(0, topicCount + 1)
- .forEach(i -> IntStream.range(0, i * i)
- .forEach(msgCount -> {
- assertDoesNotThrow(() -> producer.send(
- new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)).get());
- })
- );
+ .forEach(i -> IntStream.range(0, i * i)
+ .forEach(msgCount -> assertDoesNotThrow(() -> producer.send(
+ new ProducerRecord<>(topicName.apply(i), msgCount % i, null, "val" + msgCount)).get())));
}
}
+ private static List withRemoteStorage() {
+ Map serverProperties = new HashMap<>();
+ serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, "1");
+ serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, "1");
+ serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
+ serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
+ serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "1000");
+ serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, "1000");
+ serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+ serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "EXTERNAL");
+
+ return Collections.singletonList(
+ // we set REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP to EXTERNAL, so we need to
+ // align listener name here as KafkaClusterTestKit (KRAFT/CO_KRAFT) the default
+ // broker listener name is EXTERNAL while in ZK it is PLAINTEXT
+ ClusterConfig.defaultBuilder()
+ .setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet()))
+ .setServerProperties(serverProperties)
+ .setListenerName("EXTERNAL")
+ .build());
+ }
+
private void createConsumerAndPoll() {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
@@ -274,6 +334,59 @@ public class GetOffsetShellTest {
}
}
+ @ClusterTemplate("withRemoteStorage")
+ public void testGetOffsetsByEarliestLocalSpec() throws InterruptedException {
+ setUp();
+ setUpRemoteLogTopics();
+
+ for (String time : new String[] {"-4", "earliest-local"}) {
+ // test topics disable remote log storage
+ // as remote log disabled, broker return the same result as earliest offset
+ TestUtils.waitForCondition(() ->
+ Arrays.asList(
+ new Row("topic1", 0, 0L),
+ new Row("topic2", 0, 0L),
+ new Row("topic3", 0, 0L),
+ new Row("topic4", 0, 0L))
+ .equals(executeAndParse("--topic-partitions", "topic\\d+.*:0", "--time", time)),
+ "testGetOffsetsByEarliestLocalSpec get topics with remote log disabled result not match");
+
+ // test topics enable remote log storage
+ TestUtils.waitForCondition(() ->
+ Arrays.asList(
+ new Row("topicRLS1", 0, 0L),
+ new Row("topicRLS2", 0, 1L),
+ new Row("topicRLS3", 0, 2L),
+ new Row("topicRLS4", 0, 3L))
+ .equals(executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)),
+ "testGetOffsetsByEarliestLocalSpec get topics with remote log enabled result not match");
+ }
+ }
+
+ @ClusterTemplate("withRemoteStorage")
+ public void testGetOffsetsByLatestTieredSpec() throws InterruptedException {
+ setUp();
+ setUpRemoteLogTopics();
+
+ for (String time : new String[] {"-5", "latest-tiered"}) {
+ // test topics disable remote log storage
+ // as remote log not enabled, broker return unknown offset for each topic partition and these
+ // unknown offsets are ignored by GetOffsetShell hence we have empty result here.
+ assertEquals(Collections.emptyList(),
+ executeAndParse("--topic-partitions", "topic\\d+:0", "--time", time));
+
+ // test topics enable remote log storage
+ // topicRLS1 has no result because there's no log segments being uploaded to the remote storage
+ TestUtils.waitForCondition(() ->
+ Arrays.asList(
+ new Row("topicRLS2", 0, 0L),
+ new Row("topicRLS3", 0, 1L),
+ new Row("topicRLS4", 0, 2L))
+ .equals(executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)),
+ "testGetOffsetsByLatestTieredSpec result not match");
+ }
+ }
+
@ClusterTest
public void testGetOffsetsByTimestamp() {
setUp();