mirror of https://github.com/apache/kafka.git
MINOR: Various cleanups in storage (#15711)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
49e7c795dc
commit
3617dda9a5
|
@ -72,15 +72,11 @@ public class ProducerManager implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Callback callback = new Callback() {
|
Callback callback = (metadata, exception) -> {
|
||||||
@Override
|
if (exception != null) {
|
||||||
public void onCompletion(RecordMetadata metadata,
|
future.completeExceptionally(exception);
|
||||||
Exception exception) {
|
} else {
|
||||||
if (exception != null) {
|
future.complete(metadata);
|
||||||
future.completeExceptionally(exception);
|
|
||||||
} else {
|
|
||||||
future.complete(metadata);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
|
producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class CleanShutdownFileHandler {
|
||||||
public int version;
|
public int version;
|
||||||
public Long brokerEpoch;
|
public Long brokerEpoch;
|
||||||
|
|
||||||
public Content() {};
|
public Content() {}
|
||||||
|
|
||||||
public Content(int version, Long brokerEpoch) {
|
public Content(int version, Long brokerEpoch) {
|
||||||
this.version = version;
|
this.version = version;
|
||||||
|
@ -86,7 +86,6 @@ public class CleanShutdownFileHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public OptionalLong read() {
|
public OptionalLong read() {
|
||||||
try {
|
try {
|
||||||
String text = Utils.readFileAsString(cleanShutdownFile.toPath().toString());
|
String text = Utils.readFileAsString(cleanShutdownFile.toPath().toString());
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
|
||||||
|
|
||||||
public ByteBuffer readAsByteBuffer() throws IOException {
|
public ByteBuffer readAsByteBuffer() throws IOException {
|
||||||
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
ByteArrayOutputStream stream = new ByteArrayOutputStream();
|
||||||
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8));) {
|
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) {
|
||||||
CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer = new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
|
CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer = new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER);
|
||||||
writeBuffer.write(epochs);
|
writeBuffer.write(epochs);
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
|
|
@ -27,7 +27,4 @@ public class IndexOffsetOverflowException extends KafkaException {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexOffsetOverflowException(String message, Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,5 +19,5 @@ package org.apache.kafka.storage.internals.log;
|
||||||
public enum LeaderHwChange {
|
public enum LeaderHwChange {
|
||||||
INCREASED,
|
INCREASED,
|
||||||
SAME,
|
SAME,
|
||||||
NONE;
|
NONE
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class LogAppendInfo {
|
||||||
int validBytes,
|
int validBytes,
|
||||||
long lastOffsetOfFirstBatch) {
|
long lastOffsetOfFirstBatch) {
|
||||||
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
|
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
|
||||||
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
|
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
|
||||||
LeaderHwChange.NONE);
|
LeaderHwChange.NONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -226,7 +226,6 @@ public class LogConfig extends AbstractConfig {
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private static final String MESSAGE_FORMAT_VERSION_DOC = TopicConfig.MESSAGE_FORMAT_VERSION_DOC;
|
private static final String MESSAGE_FORMAT_VERSION_DOC = TopicConfig.MESSAGE_FORMAT_VERSION_DOC;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private static final LogConfigDef CONFIG = new LogConfigDef();
|
private static final LogConfigDef CONFIG = new LogConfigDef();
|
||||||
static {
|
static {
|
||||||
CONFIG.
|
CONFIG.
|
||||||
|
@ -390,7 +389,6 @@ public class LogConfig extends AbstractConfig {
|
||||||
|
|
||||||
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
|
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
|
||||||
// we are using its value if messageTimestampBeforeMaxMs default value hasn't changed.
|
// we are using its value if messageTimestampBeforeMaxMs default value hasn't changed.
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private long getMessageTimestampBeforeMaxMs() {
|
private long getMessageTimestampBeforeMaxMs() {
|
||||||
final Long messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
|
final Long messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
|
||||||
if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) {
|
if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) {
|
||||||
|
@ -402,7 +400,6 @@ public class LogConfig extends AbstractConfig {
|
||||||
|
|
||||||
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
|
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
|
||||||
// we are using its value if messageTimestampAfterMaxMs default value hasn't changed.
|
// we are using its value if messageTimestampAfterMaxMs default value hasn't changed.
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private long getMessageTimestampAfterMaxMs() {
|
private long getMessageTimestampAfterMaxMs() {
|
||||||
final Long messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
|
final Long messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
|
||||||
if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) {
|
if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) {
|
||||||
|
@ -412,7 +409,6 @@ public class LogConfig extends AbstractConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public RecordVersion recordVersion() {
|
public RecordVersion recordVersion() {
|
||||||
return messageFormatVersion.highestSupportedRecordVersion();
|
return messageFormatVersion.highestSupportedRecordVersion();
|
||||||
}
|
}
|
||||||
|
|
|
@ -449,7 +449,7 @@ public class RemoteIndexCache implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int lookupTimestamp(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long timestamp, long startingOffset) throws IOException {
|
public int lookupTimestamp(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long timestamp, long startingOffset) {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
return getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position;
|
return getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position;
|
||||||
|
@ -561,7 +561,7 @@ public class RemoteIndexCache implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public OffsetPosition lookupTimestamp(long timestamp, long startingOffset) throws IOException {
|
public OffsetPosition lookupTimestamp(long timestamp, long startingOffset) {
|
||||||
entryLock.readLock().lock();
|
entryLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
|
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
|
||||||
|
|
|
@ -32,8 +32,6 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import scala.collection.JavaConverters;
|
import scala.collection.JavaConverters;
|
||||||
import scala.collection.Seq;
|
import scala.collection.Seq;
|
||||||
|
|
||||||
|
@ -46,7 +44,6 @@ import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
|
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
|
||||||
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
|
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
|
||||||
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
|
|
||||||
|
|
||||||
private static final int SEG_SIZE = 1024 * 1024;
|
private static final int SEG_SIZE = 1024 * 1024;
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopTopicBasedRemoteLogMetadataManagerHarness() throws IOException {
|
private void stopTopicBasedRemoteLogMetadataManagerHarness() {
|
||||||
remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager();
|
remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ public final class LocalTieredStorageCondition {
|
||||||
* @param conjuct Another condition which truth is required for the resulting new condition to be true.
|
* @param conjuct Another condition which truth is required for the resulting new condition to be true.
|
||||||
* @return A new condition which is true iff this condition and {@code conjuct} are both true.
|
* @return A new condition which is true iff this condition and {@code conjuct} are both true.
|
||||||
*/
|
*/
|
||||||
public final LocalTieredStorageCondition and(final LocalTieredStorageCondition conjuct) {
|
public LocalTieredStorageCondition and(final LocalTieredStorageCondition conjuct) {
|
||||||
//
|
//
|
||||||
// To keep things simple, only authorize to append to the condition chain of elementary (not composed)
|
// To keep things simple, only authorize to append to the condition chain of elementary (not composed)
|
||||||
// conditions. It also allows to protect from cycles.
|
// conditions. It also allows to protect from cycles.
|
||||||
|
|
|
@ -51,10 +51,7 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
|
||||||
private final EventType type;
|
private final EventType type;
|
||||||
private final RemoteLogSegmentId segmentId;
|
private final RemoteLogSegmentId segmentId;
|
||||||
private final int timestamp;
|
private final int timestamp;
|
||||||
private final Optional<RemoteLogSegmentFileset> fileset;
|
|
||||||
private final Optional<RemoteLogSegmentMetadata> metadata;
|
private final Optional<RemoteLogSegmentMetadata> metadata;
|
||||||
private final int startPosition;
|
|
||||||
private final int endPosition;
|
|
||||||
private final Optional<Exception> exception;
|
private final Optional<Exception> exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,10 +121,7 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
|
||||||
this.type = builder.eventType;
|
this.type = builder.eventType;
|
||||||
this.segmentId = builder.segmentId;
|
this.segmentId = builder.segmentId;
|
||||||
this.timestamp = builder.timestamp;
|
this.timestamp = builder.timestamp;
|
||||||
this.fileset = ofNullable(builder.fileset);
|
|
||||||
this.metadata = ofNullable(builder.metadata);
|
this.metadata = ofNullable(builder.metadata);
|
||||||
this.startPosition = builder.startPosition;
|
|
||||||
this.endPosition = builder.endPosition;
|
|
||||||
this.exception = ofNullable(builder.exception);
|
this.exception = ofNullable(builder.exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,10 +131,10 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private int brokerId;
|
private final int brokerId;
|
||||||
private EventType eventType;
|
private final EventType eventType;
|
||||||
private RemoteLogSegmentId segmentId;
|
private final RemoteLogSegmentId segmentId;
|
||||||
private int timestamp;
|
private final int timestamp;
|
||||||
private RemoteLogSegmentFileset fileset;
|
private RemoteLogSegmentFileset fileset;
|
||||||
private RemoteLogSegmentMetadata metadata;
|
private RemoteLogSegmentMetadata metadata;
|
||||||
private int startPosition;
|
private int startPosition;
|
||||||
|
|
|
@ -387,7 +387,7 @@ public final class LocalTieredStorageTest {
|
||||||
getClass().getSimpleName(), testName, DATE_TIME_FORMATTER.format(LocalDateTime.now()));
|
getClass().getSimpleName(), testName, DATE_TIME_FORMATTER.format(LocalDateTime.now()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public final class Verifier {
|
public static final class Verifier {
|
||||||
private final LocalTieredStorage remoteStorage;
|
private final LocalTieredStorage remoteStorage;
|
||||||
private final TopicIdPartition topicIdPartition;
|
private final TopicIdPartition topicIdPartition;
|
||||||
|
|
||||||
|
|
|
@ -70,12 +70,12 @@ public final class TieredStorageTestBuilder {
|
||||||
private final int defaultProducedBatchSize = 1;
|
private final int defaultProducedBatchSize = 1;
|
||||||
private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
|
private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
|
||||||
|
|
||||||
|
private final Map<TopicPartition, List<DeletableSpec>> deletables = new HashMap<>();
|
||||||
|
private final List<TieredStorageTestAction> actions = new ArrayList<>();
|
||||||
private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
|
private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
|
||||||
private Map<TopicPartition, List<OffloadableSpec>> offloadables = new HashMap<>();
|
private Map<TopicPartition, List<OffloadableSpec>> offloadables = new HashMap<>();
|
||||||
private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
|
private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
|
||||||
private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
|
private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
|
||||||
private Map<TopicPartition, List<DeletableSpec>> deletables = new HashMap<>();
|
|
||||||
private List<TieredStorageTestAction> actions = new ArrayList<>();
|
|
||||||
|
|
||||||
public TieredStorageTestBuilder() {
|
public TieredStorageTestBuilder() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,11 +26,6 @@ public final class ExpandPartitionCountSpec {
|
||||||
private final int partitionCount;
|
private final int partitionCount;
|
||||||
private final Map<Integer, List<Integer>> assignment;
|
private final Map<Integer, List<Integer>> assignment;
|
||||||
|
|
||||||
public ExpandPartitionCountSpec(String topicName,
|
|
||||||
int partitionCount) {
|
|
||||||
this(topicName, partitionCount, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ExpandPartitionCountSpec(String topicName,
|
public ExpandPartitionCountSpec(String topicName,
|
||||||
int partitionCount,
|
int partitionCount,
|
||||||
Map<Integer, List<Integer>> assignment) {
|
Map<Integer, List<Integer>> assignment) {
|
||||||
|
|
|
@ -50,14 +50,6 @@ public final class LocalTieredStorageOutput<K, V> implements LocalTieredStorageT
|
||||||
return row(file, offset, record, " ");
|
return row(file, offset, record, " ");
|
||||||
}
|
}
|
||||||
|
|
||||||
private String row(String file, Object offset) {
|
|
||||||
return row(file, offset, "");
|
|
||||||
}
|
|
||||||
|
|
||||||
private String row(String file) {
|
|
||||||
return row(file, "", "");
|
|
||||||
}
|
|
||||||
|
|
||||||
private String row() {
|
private String row() {
|
||||||
return row("", "", "");
|
return row("", "", "");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue