Add a remote.log.disable.policy on a topic-level only as part of KIP-950
Reviewers: Kamal Chandraprakash <kchandraprakash@uber.com>, Luke Chen <showuon@gmail.com>, Murali Basani <muralidhar.basani@aiven.io>
Create 3 new metadata versions:
- 3.8-IV0, for the upcoming 3.8 release.
- 3.9-IV0, to add support for KIP-1005.
- 3.9-IV1, as the new release vehicle for KIP-966.
Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version.
Reviewers: Jun Rao <junrao@gmail.com>, Justine Olshan <jolshan@confluent.io>
Add field tieredEpoch to RemoteLogSegmentMetadata
Update relevant tests
Add two fields tieredEpoch and tieredState to TopicRecord.json
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
The below remote log configs can be configured dynamically:
1. remote.log.manager.copy.max.bytes.per.second
2. remote.log.manager.fetch.max.bytes.per.second and
3. remote.log.index.file.cache.total.size.bytes
If those values are configured dynamically, then during the broker restart, it ensures the dynamic values are loaded instead of the static values from the config.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
Improve consistency and correctness for user-provided timeouts at the Consumer network request layer, per the Java client Consumer timeouts design (https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts). While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's event layer, this change enforces timeouts at the network request layer.
The changes mostly fit into the following areas:
1. Create shared code and idioms so timeout handling logic is consistent across current and future RequestManager implementations
2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, retryExpirationTimeMs, timeoutMs, etc.
3. Update "preemptive pruning" to remove expired requests that have had at least one attempt
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
- Use SystemTime instead of MockTime when time is not mocked
- Use static assertions to reduce the line length
- Fold the lines if it exceeds the limit
- rename tp0 to tpId0 when it refers to TopicIdPartition
Reviewers: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.
To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent
Reviewers: Jun Rao <junrao@gmail.com>
add more unit tests to LogSegments and do some small refactor in LogSegments.java
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Jun Rao <junrao@gmail.com>
Skip using stream when expiring the producer ID. This can improve the performance significantly when the count is high.
Before
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 101.253 ± 28.031 us/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 2297.219 ± 1690.486 us/op
ProducerStateManagerBench.testDeleteExpiringIds 1000000 avgt 3 30688.865 ± 16348.768 us/op
After
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 39.122 ± 1.151 us/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 464.363 ± 98.857 us/op
ProducerStateManagerBench.testDeleteExpiringIds 1000000 avgt 3 5731.169 ± 674.380 us/op
Also, made a change to the JMH testing which excludes the producer ID populating from the testing.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
After JBOD is supported in KRaft, we should also enable JBOD support in tiered storage. Unit tests and Integration tests are also added.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Igor Soarez <soarez@apple.com>, Mickael Maison <mickael.maison@gmail.com>
Don't throw OFFSET_OUT_OF_RANGE error when converting the offset to metadata, and next time the leader should increment the high watermark by itself after receiving fetch requests from followers. This can happen when checkpoint files are missing and being elected as a leader.
Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@apache.org>
1. Replace TopicBasedRemoteLogMetadataManagerWrapperWithHarness with RemoteLogMetadataManagerTestUtils#builder in RemoteLogMetadataManagerTest.
2. Use ClusterTestExtention for RemoteLogMetadataManagerTest.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest has a race when it sets RemoteLogMetadataTopicPartitioner using the setter.
This change fixes the race condition by passing the RemoteLogMetadataTopicPartitioner instance in a Function<Integer, RemoteLogMetaedataTopicPartitioner> which is used in configure() in TopicBasedRemoteLogMetadataManager.
It also improves the waitingFor condition by spying on RemotePartitionMetadataStore and awaiting on Phasers to ensure ConsumerManager makes progress before performing assertions.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.
This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
If there are some logs to be deleted during the log dir movement, we'll send for a scheduler to do the deletion later.
However, when the log dir movement completed, the future log is renamed, the async log deletion will fail with no file existed error.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, SoontaekLim <soontaek.lim@neya.kr>, Johnny Hsu <johnnyhsu@fb.com>
RemoteLogMetadataSerde references RemoteLogMetadataTransform in a Raw
form. Given that the class is parametrized we should make use of it.
Signed-off-by: Josep Prat <josep.prat@aiven.io>
Reviewers: Matthew de Detrich <matthew.dedetrich@aiven.io>, Mickael Maison <mickael.maison@gmail.com>
We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.
1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
2. assignOffsetsNonCompressed -> ditto
3. validateMessagesAndAssignOffsetsCompressed -> ditto
4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
6. LogSegment#recover -> ditto
Reviewers: Jun Rao <junrao@gmail.com>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix getOffsetByMaxTimestamp for compressed records.
This PR adds:
1) For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always.
2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always.
3) Add tests to verify the fix.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
`CommittedOffsetsFile` can be introduced when it is required for enhancing TBRLMM to consume from a specific offset when snapshots are implemented.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>
Expiration of ProducerIds is implemented with a slow removal of map keys:
producers.keySet().removeAll(keys);
Unnecessarily going through all producer ids and then throw all expired keys to be removed.
This leads to exponential time on worst case when most/all keys need to be removed:
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op
A simple fix is to use map#remove(key) instead, leading to a more linear growth:
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op
Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million:
Reviewers: Justine Olshan <jolshan@confluent.io>
This pull request aims to implement RemoteLogSizeBytes from KIP-963.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This pull request aims to implement RemoteCopyLagSegments, RemoteDeleteLagBytes and RemoteDeleteLagSegments from KIP-963.
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This PR implements part of KIP-963, specifically for adding new metrics.
The metrics added in this PR are:
RemoteDeleteRequestsPerSec (emitted when expired log segments on remote storage being deleted)
RemoteDeleteErrorsPerSec (emitted when failed to delete expired log segments on remote storage)
BuildRemoteLogAuxStateRequestsPerSec (emitted when building remote log aux state for replica fetchers)
BuildRemoteLogAuxStateErrorsPerSec (emitted when failed to build remote log aux state for replica fetchers)
Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Ramakrishnan <ramakrishnan.nikhil@gmail.com>, Christo Lolov <lolovc@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
This pull request implements the first in the list of metrics in KIP-963: Additional metrics in Tiered Storage.
Since each partition of a topic will be serviced by its own RLMTask we need an aggregator object for a topic. The aggregator object in this pull request is BrokerTopicAggregatedMetric. Since the RemoteCopyLagBytes is a gauge I have introduced a new GaugeWrapper. The GaugeWrapper is used by the metrics collection system to interact with the BrokerTopicAggregatedMetric. The RemoteLogManager interacts with the BrokerTopicAggregatedMetric directly.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock
In the meantime the lock is held, all subsequent produces against the partition may block
This easily causes all request-handlers to be busy on bad disk performance
Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status)
This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock:
(1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point)
Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem
(2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion
This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync.
I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir)
This change shouldn't cause problems neither.
(3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset
This path is called from deleteRecords on request-handler threads.
Here, we don't need fsync(2) either actually.
On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem
(4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
The internal topic creation is asynchronous so the test gets flaky. To fix the test flakiness and in this test I want to assert that doesTopicExist should return true when a topic exists, so created a dummy internal topic.
Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <jun@confluent.io>, Satish Duggana <satishd@apache.org>
Roll the active segment and offload it to remote storage once it breaches the retention time policy.
A segment is eligible for deletion once it gets uploaded to the remote storage. We have checks to allow only the passive segments to be uploaded, so the active segment never gets removed at all even if breaches the retention time. For low-throughput/stale topics, the active segment can hold the data beyond the configured retention time by the user.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
"findHighestRemoteOffset" does not take into account the leader-epoch end offset. This can cause log divergence between the local and remote log segments when there is unclean leader election.
To handle it correctly, the logic to find the highest remote offset can be updated to:
find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch)
Discussion thread: https://github.com/apache/kafka/pull/14004#discussion_r1266864272
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
This patch contains a few small clean-ups in LogValidator and associated classes:
1. Set shallowOffsetOfMaxTimestamp consistently as the last offset in the
batch for v2 compressed and non-compressed data.
2. Rename `RecordConversionStats` to `RecordValidationStats` since one of its
fields `temporaryMemoryBytes` does not depend on conversion.
3. Rename `batchIndex` in `recordIndex` in loops over the records in each batch
inside `LogValidator`.
Reviewers: Qichao Chu <5326144+ex172000@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
When a node starts (or) restarts, then we send a CREATE_TOPICS request to the controller to create the internal __remote_log_metadata topic.
Topic creation event is costly and handled by the controller. During re-balance, the controller can have pending requests in its queue and can lead to CREATE_TOPICS timeout. Instead of firing the CREATE_TOPICS request when a node restarts, send a METADATA request (topic describe) which is handled by the least loaded node before sending a request to create the topic.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the ReplicaManager will be updated before the metadata cache.
In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized.
Testing:
Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test will fail locally nearly deterministically. After the change(s), the test no longer fails.
Reviewers: Justine Olshan <jolshan@confluent.io>
This PR moves PartitionMetadataFile to the storage module.
Existing unit tests in UnifiedLogTest like testLogFlushesPartitionMetadataOnAppend should suffice.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.
The bug could be reproduced as per the following order of events:-
Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.
This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.
The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Artem Livshits <alivshits@confluent.io>
The RemoteIndexCache has a variable lock and the child class also have a variable lock in the same class file. Renaming lock of the entry(child class) to avoid confusion.
Reviewers: Luke Chen <showuon@gmail.com>, hudeqi <1217150961@qq.com>
A few notes:
* Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils`
* Fix `CoreUtils.swallow` to use the passed in `logging`
* Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening
* Minor tweaks in `LogSegment` for readability
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
est Cases Covered
1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>