1. Replace TopicBasedRemoteLogMetadataManagerWrapperWithHarness with RemoteLogMetadataManagerTestUtils#builder in RemoteLogMetadataManagerTest.
2. Use ClusterTestExtention for RemoteLogMetadataManagerTest.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest has a race when it sets RemoteLogMetadataTopicPartitioner using the setter.
This change fixes the race condition by passing the RemoteLogMetadataTopicPartitioner instance in a Function<Integer, RemoteLogMetaedataTopicPartitioner> which is used in configure() in TopicBasedRemoteLogMetadataManager.
It also improves the waitingFor condition by spying on RemotePartitionMetadataStore and awaiting on Phasers to ensure ConsumerManager makes progress before performing assertions.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.
This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
If there are some logs to be deleted during the log dir movement, we'll send for a scheduler to do the deletion later.
However, when the log dir movement completed, the future log is renamed, the async log deletion will fail with no file existed error.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, SoontaekLim <soontaek.lim@neya.kr>, Johnny Hsu <johnnyhsu@fb.com>
RemoteLogMetadataSerde references RemoteLogMetadataTransform in a Raw
form. Given that the class is parametrized we should make use of it.
Signed-off-by: Josep Prat <josep.prat@aiven.io>
Reviewers: Matthew de Detrich <matthew.dedetrich@aiven.io>, Mickael Maison <mickael.maison@gmail.com>
We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.
1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
2. assignOffsetsNonCompressed -> ditto
3. validateMessagesAndAssignOffsetsCompressed -> ditto
4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
6. LogSegment#recover -> ditto
Reviewers: Jun Rao <junrao@gmail.com>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix getOffsetByMaxTimestamp for compressed records.
This PR adds:
1) For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always.
2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always.
3) Add tests to verify the fix.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
`CommittedOffsetsFile` can be introduced when it is required for enhancing TBRLMM to consume from a specific offset when snapshots are implemented.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>
Expiration of ProducerIds is implemented with a slow removal of map keys:
producers.keySet().removeAll(keys);
Unnecessarily going through all producer ids and then throw all expired keys to be removed.
This leads to exponential time on worst case when most/all keys need to be removed:
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 9164.043 ± 10647.877 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 341561.093 ± 20283.211 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 44957983.550 ± 9389011.290 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 5683374164.167 ± 1446242131.466 ns/op
A simple fix is to use map#remove(key) instead, leading to a more linear growth:
Benchmark (numProducerIds) Mode Cnt Score Error Units
ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 5779.056 ± 651.389 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 61430.530 ± 21875.644 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 10000 avgt 3 643887.031 ± 600475.302 ns/op
ProducerStateManagerBench.testDeleteExpiringIds 100000 avgt 3 7741689.539 ± 3218317.079 ns/op
Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million:
Reviewers: Justine Olshan <jolshan@confluent.io>
While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock
In the meantime the lock is held, all subsequent produces against the partition may block
This easily causes all request-handlers to be busy on bad disk performance
Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status)
This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock:
(1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point)
Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem
(2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion
This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync.
I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir)
This change shouldn't cause problems neither.
(3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset
This path is called from deleteRecords on request-handler threads.
Here, we don't need fsync(2) either actually.
On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem
(4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
The internal topic creation is asynchronous so the test gets flaky. To fix the test flakiness and in this test I want to assert that doesTopicExist should return true when a topic exists, so created a dummy internal topic.
Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <jun@confluent.io>, Satish Duggana <satishd@apache.org>
Roll the active segment and offload it to remote storage once it breaches the retention time policy.
A segment is eligible for deletion once it gets uploaded to the remote storage. We have checks to allow only the passive segments to be uploaded, so the active segment never gets removed at all even if breaches the retention time. For low-throughput/stale topics, the active segment can hold the data beyond the configured retention time by the user.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
"findHighestRemoteOffset" does not take into account the leader-epoch end offset. This can cause log divergence between the local and remote log segments when there is unclean leader election.
To handle it correctly, the logic to find the highest remote offset can be updated to:
find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch)
Discussion thread: https://github.com/apache/kafka/pull/14004#discussion_r1266864272
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
This patch contains a few small clean-ups in LogValidator and associated classes:
1. Set shallowOffsetOfMaxTimestamp consistently as the last offset in the
batch for v2 compressed and non-compressed data.
2. Rename `RecordConversionStats` to `RecordValidationStats` since one of its
fields `temporaryMemoryBytes` does not depend on conversion.
3. Rename `batchIndex` in `recordIndex` in loops over the records in each batch
inside `LogValidator`.
Reviewers: Qichao Chu <5326144+ex172000@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
When a node starts (or) restarts, then we send a CREATE_TOPICS request to the controller to create the internal __remote_log_metadata topic.
Topic creation event is costly and handled by the controller. During re-balance, the controller can have pending requests in its queue and can lead to CREATE_TOPICS timeout. Instead of firing the CREATE_TOPICS request when a node restarts, send a METADATA request (topic describe) which is handled by the least loaded node before sending a request to create the topic.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the ReplicaManager will be updated before the metadata cache.
In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized.
Testing:
Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test will fail locally nearly deterministically. After the change(s), the test no longer fails.
Reviewers: Justine Olshan <jolshan@confluent.io>
This PR moves PartitionMetadataFile to the storage module.
Existing unit tests in UnifiedLogTest like testLogFlushesPartitionMetadataOnAppend should suffice.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.
The bug could be reproduced as per the following order of events:-
Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.
This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.
The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Artem Livshits <alivshits@confluent.io>
The RemoteIndexCache has a variable lock and the child class also have a variable lock in the same class file. Renaming lock of the entry(child class) to avoid confusion.
Reviewers: Luke Chen <showuon@gmail.com>, hudeqi <1217150961@qq.com>
A few notes:
* Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils`
* Fix `CoreUtils.swallow` to use the passed in `logging`
* Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening
* Minor tweaks in `LogSegment` for readability
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
est Cases Covered
1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
The LogAppendInfo class is a bit bloated in terms of class fields. That's because it is used as an umbrella class for both leader log appends and follower log appends and needs to carry fields for both. This makes the constructor for the class a bit cludgy to use. It also ends up being a bit confusing when fields are important and when they aren't. I noticed there were a few fields that didn't seem necessary.
Below is a description of changes:
firstOffset is a LogOffsetMetadata but there are no readers of the field that use anything but the messageOffset field - simplified to a long.
LogAppendInfo.errorMessage is only set in one context - when calling LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this constructor, we pass up the original exception in LogAppendResult anyway, so the field is redundant with the LogAppendResult.exception field. This allows us to simplify the handling in KAFKA-15459: Convert coordinator retriable errors to a known producer response error #14378 since there are no custom error messages we just return whatever is in the exception message.
We only use targetCompressionType when constructing the LogValidator - just inline the call instead of including it in the LogAppendInfo.
offsetsMonotonic is only used when not assigning offsets to throw an exception - just throw the exception instead of setting a field to throw later.
shallowCount is only there to determine whether there are any messages in the append. Instead, we can just check validBytes which is incremented with a non-zero value every time we increment shallowCount.
Reviewers: Justine Olshan <jolshan@confluent.io>
A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
DeleteSegmentsDueToLogStartOffsetBreach configures the segment such that it can hold at-most 2 record-batches. And, it asserts that the local-log-start-offset based on the assumption that each segment will contain exactly two messages.
During leader switch, the segment can get rotated and may not always contain two records. Previously, we were checking whether the expected local-log-start-offset is equal to the base-offset-of-the-first-local-log-segment. With this patch, we will scan the first local-log-segment for the expected offset.
Reviewers: Divij Vaidya <diviv@amazon.com>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
In this PR, we noticed failed tests caused by the verification of log start offset and local log start offset. After investigation: #14347 (comment)
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14347/13/#showFailuresLink
After investigation, I found it's because the scala 2.12 cannot recognize the override method of maybeWaitForAtLeastOneSegmentUpload since it's using varargs in scala. I think there must be some bugs/gaps between java/scala that causes these issue. We can fix it by not using varargs, instead, using the Seq.
This PR adds back the log start offset and local log start offset verification, and make sure all tests passed.
Reviewers: Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This test extends the existing TransactionsTest. It configures the broker and topic with tiered storage and expects at-least one log segment to be uploaded to the remote storage.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
Reduce default remote.log.metadata.initialization.retry.interval.ms value to 100ms.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
- Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex.
- Updated the LocalTieredStorage implementation to adhere to the new contract.
- Added Unit Tests for the change.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
- RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator
- Fix getting the `localTieredStorage` by brokerId after stopping a broker.
Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
* Added the integration test for DELETE_RECORDS API for tiered storage enabled topic
* Added validation checks before removing remote log segments for log-start-offset breach
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
On leadership failover, the new leader's start offset may be older than the start offset of old leader. This works fine for local storage scenario because the new leader still contains data associated with stale start offset. But in case of remote storage, although new leader has a stale offset, the data associated with it has been deleted from remote by the old leader. Hence, we end up in a situation where leader has a start offset but no data associated with it.
This commit fixes the situation by ensuring that on every leadership failover, for topics with remote storage, the leader will update it's start offset from the base of first segment in current leader chain present in the remote storage (if any).
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
- Updated the log-start-offset to the correct value while building the replica state in ReplicaFetcherTierStateMachine#buildRemoteLogAuxState
Integration tests added:
1. ReassignReplicaExpandTest
2. ReassignReplicaMoveTest and
3. ReassignReplicaShrinkTest
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This will allow enabling and disabling transaction verification (KIP-890 part 1) without having to roll the cluster.
Tested that restarting the cluster persists the configuration.
If a verification is disabled/enabled while we have an inflight request, depending on the step of the process, the change may or may not be seen in the inflight request (enabling will typically fail unverified requests, but we may still verify and reject when we first disable) Subsequent requests/retries will behave as expected for verification.
Sequence checks will continue to take place after disabling until the first message is written to the partition (thus clearing the verification entry with the tentative sequence) or the broker restarts/partition is reassigned which will clear the memory. On enabling, we will only track sequences that for requests received after the verification is enabled.
Reviewers: Jason Gustafson <jason@confluent.io>, Satish Duggana <satishd@apache.org>
Added the below integration tests with tiered storage
- PartitionsExpandTest
- DeleteSegmentsByRetentionSizeTest
- DeleteSegmentsByRetentionTimeTest and
- EnableRemoteLogOnTopicTest
- Enabled the test for both ZK and Kraft modes.
These are enabled for both ZK and Kraft modes.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
In SocketServerTest, we create SocketServer and enableRequestProcessing on each test class initialization. That's fine since we shutdown it in @AfterEach. The issue we have is we disabled 2 tests in this test suite. And when running these disabled tests, we will go through class initialization, but without @AfterEach. That causes 2 network thread leaked.
Compared the error message in DynamicBrokerReconfigurationTest#testThreadPoolResize test here:
org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: List(data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> expected: <true> but was: <false>
The 2 unexpected network threads are leaked from SocketServerTest.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Chris Egerton <chrise@aiven.io>
When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should not reset its offset to first-local-log-segment-base-offset.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
Disabled the below tests to fix the thread leak:
1. kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() and
2. org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>
The purpose of this change is to not allow a broker to start up with Tiered Storage disabled (remote.log.storage.system.enable=false) while there are still topics that have 'remote.storage.enable' set.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This change does the following:
1. Make RemoteLogManagerConfigs that are implemented public
2. Add tasks to generate html docs for the configs
3. Include config docs in the main site
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
This PR adds the following changes to the `TopicBasedRemoteLogMetadataManager`
1. Added a guard in RemoteLogMetadataCache so that the incoming request can be served from the cache iff the corresponding user-topic-partition is initalized
2. Improve error handling in ConsumerTask thread so that is not killed when there are errors in reading the internal topic
3. ConsumerTask initialization should handle the case when there are no records to read
and some other minor changes
Added Unit Tests for the changes
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
This change introduces a remote log segment segment retention cleanup mechanism.
RemoteLogManager runs retention cleanup activity tasks on each leader replica. It assesses factors such as overall size and retention duration, subsequently removing qualified segments from remote storage. This process also involves adjusting the log-start-offset within the UnifiedLog accordingly. It also cleans up the segments which have epochs earlier than the earliest leader epoch in the current leader.
Co-authored-by: Satish Duggana <satishd@apache.org>
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Divij Vaidya <diviv@amazon.com, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Nikhil Ramakrishnan <ramakrishnan.nikhil@gmail.com>
This implementation introduces two new configurations `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms` and deprecates `log.message.timestamp.difference.max.ms`.
The default value for all these three configs is maintained to be Long.MAX_VALUE for backward compatibility but with the newly added configurations we can have a finer control when validating message timestamps that are in the past and the future compared to the broker's timestamp.
To maintain backward compatibility if the default value of `log.message.timestamp.before.max.ms` is not changed, we are assuming users are still using the deprecated config `log.message.timestamp.difference.max.ms` and validation is done using its value. This ensures that existing customers who have customized the value of `log.message.timestamp.difference.max.ms` will continue to see no change in behavior.
Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
Noticed that there was a dangling unused class (LongRef, replaced by PrimitiveRef.LongRef), and the LogOffsetMetadata toString was a little oddly formatted.
Reviewers: Justine Olshan <jolshan@confluent.io>
`TieredStorageTestHarness` is a base class for integration tests exercising the tiered storage functionality. This uses `LocalTieredStorage` instance as the second-tier storage system and `TopicBasedRemoteLogMetadataManager` as the remote log metadata manager.
Co-authored-by: Alexandre Dupriez <alexandre.dupriez@gmail.com>
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
* Delete remote segments when deleting a topic
Co-authored-by: Kamal Chandraprakash <kchandraprakash@uber.com>
Co-authored-by: d00791190 <dinglan6@huawei.com>
As described in the KIP here the default value of remote.log.metadata.manager.class.name should be TopicBasedRemoteLogMetadataManager
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Divij Vaidya <diviv@amazon.com>
Add config validation which verifies that system level remote storage is enabled when enabling remote storage for a topic. In case verification fails, it throws InvalidConfigurationException.
Reviewers: Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>
When configuring RLMM, the configs passed into configure method is the RemoteLogManagerConfig. But in RemoteLogManagerConfig, there's no configs related to remote.log.metadata.*, ex: remote.log.metadata.topic.replication.factor. So, even if users have set the config in broker, it'll never be applied.
This PR fixed the issue to allow users setting RLMM prefix: remote.log.metadata.manager.impl.prefix (default is rlmm.config.), and then, appending the desired remote.log.metadata.* configs, it'll pass into RLMM, including remote.log.metadata.common.client./remote.log.metadata.producer./ remote.log.metadata.consumer. prefixes.
Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.
rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4
rsm.config.test=value
Reviewers: Christo Lolov <christololov@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Divij Vaidya <diviv@amazon.com>
Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.
Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
* KAFKA-15107: Support custom metadata for remote log segment
This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.
On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc).
Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
* KAFKA-14953: Adding RemoteLogManager metrics
In this PR, I have added the following metrics that are related to tiered storage mentioned in[ KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage).
|Metric|Description|
|-----------------------------------------|--------------------------------------------------------------|
| RemoteReadRequestsPerSec | Number of remote storage read requests per second |
| RemoteWriteRequestsPerSec | Number of remote storage write requests per second |
| RemoteBytesInPerSec | Number of bytes read from remote storage per second |
| RemoteReadErrorsPerSec | Number of remote storage read errors per second |
| RemoteBytesOutPerSec | Number of bytes copied to remote storage per second |
| RemoteWriteErrorsPerSec | Number of remote storage write errors per second |
| RemoteLogReaderTaskQueueSize | Number of remote storage read tasks pending for execution. |
| RemoteLogReaderAvgIdlePercent | Average idle percent of the remote storage reader thread pool|
| RemoteLogManagerTasksAvgIdlePercent | Average idle percent of RemoteLogManager thread pool |
Added unit tests for all the rate metrics.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Staniel Yao<yaolixinylx@gmail.com>, hudeqi<1217150961@qq.com>, Satish Duggana <satishd@apache.org>
Introduced extra mapping to track verification state.
When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction.
We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.
We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.
There were a few other fixes:
- Moved the transaction manager handling for failed batch into the future completed exceptionally block to avoid processing it twice (this caused issues in unit tests)
- handle interrupted exceptions encountered when callback thread encountered them
- change handling to throw error if we try to set verification state and leaderLogIfLocal is None.
Reviewers: David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
KAFKA-14522 Rewrite and Move of RemoteIndexCache to storage module.
Cleanedup index file suffix usages and other minor cleanups
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
UnifiedLog relied on the remoteStorageSystemEnable to identify if the broker is enabling remote storage, but we never pass this value from the config into UnifiedLog. So it'll always be false.
In this PR, I did:
- pass remoteStorageSystemEnable to UnifiedLog
- remove remoteLogManager from the class member of UnifiedLog since only UnifiedLog#fetchOffsetByTimestamp needs remoteLogManager, and this can be passed when called from ReplicaManager.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
I have moved this config into producer state manager so it can be checked easily under the log lock when we are about to append.
Only a few test files currently use the validation and those have been verified to work via running the tests.
Reviews: David Jacot <djacot@confluent.io>
This change includes
- Recognize the fetch requests with out of range local log offsets
- Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset]
- Add a new purgatory for async remote read requests which are served through a specific thread pool
We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR.
A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs.
Other contributors:
Kamal Chandraprakash <kchandraprakash@uber.com> - Further improvements and adding a few tests
Luke Chen <showuon@gmail.com> - Added a few test cases for these changes.
PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those.
Reviewers: Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Divij Vaidya <diviv@amazon.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
Added functionality to copy log segments, indexes to the target remote storage for each topic partition enabled with tiered storage. This involves creating scheduled tasks for all leader partition replicas to copy their log segments in sequence to tiered storage.
Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>
The motivation for introducing InMemoryLeaderEpochCheckpoint is to allow remote log manager to create the RemoteLogSegmentMetadata(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the LeaderEpochCheckpointCache to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <djacot@confluent.io>
`LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown in some cases.
This was introduced when `LazyIndex` was rewritten from Scala to Java.
I didn't include a test since it's a bit overkill to add a concurrent test for this.
Reviewers: Jun Rao <junrao@gmail.com>
This is the PR for the implementation of KIP-847: https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics
Add ProducerIdCount metric at the broker level:
kafka.server:type=ReplicaManager,name=ProducerIdCount
Added unit tests below to ensure the metric reported the count correctly.
---------
Co-authored-by: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <christo_lolov@yahoo.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Justine Olshan <jolshan@confluent.io>
In the context of KIP-848, we implements are new group coordinator in Java. This new coordinator follows the architecture of the new quorum controller. It is basically a replicated state machine that writes to the log and commits its internal state when the writes are committed. At the moment, the only way to know when a write is committed is to use a delayed fetch. This is not ideal in our context because a delayed fetch can be completed before the write is actually committed to the log.
This patch proposes to introduce a high watermark listener to the Partition/Log layers. This will allow the new group coordinator to simply listen to changes and commit its state based on this. This mechanism is simpler and lighter as well.
Reviewers: Christo Lolov <lolovc@amazon.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
The remote index classes use `LazyIndex`, but immediately
force materialization. This results in more verbose code
and it's misleading since the indexes are not lazily
used in practice.
Also simplify `LazyIndex.forOffset/forTime` by removing
`writable` parameter, which is always `true`.
Reviewers: Satish Duggana <satishd@apache.org>
KAFKA-14480 Move/Rewrite of ProducerStateManager to the storage module.
Replaced `File.listFiles` with `Files.list` in ProducerStateManager.listSnapshotFiles
Used `Path` instead of `File` in ProducerStateManager.isSnapshotFile to check whether the given path is a file or not and has a suffix of '.snapshot'.
KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
For broader context on this change, you may want to look at KAFKA-14470: Move log layer to the storage module
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
This patch introduces a preliminary state machine that can be used by KRaft
controller to drive online migration from Zk to KRaft.
MigrationState -- Defines the states we can have while migration from Zk to
KRaft.
KRaftMigrationDriver -- Defines the state transitions, and events to handle
actions like controller change, metadata change, broker change and have
interfaces through which it claims Zk controllership, performs zk writes and
sends RPCs to ZkBrokers.
MigrationClient -- Interface that defines the functions used to claim and
relinquish Zk controllership, read to and write from Zk.
Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This is a relatively independent change in the context of KAFKA-14482.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
Additional notable changes to fix multiple dependency ordering issues:
* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
`ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`
Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: dengziming <dengziming1993@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Satish Duggana <satishd@apache.org>, Ismael Juma <ismael@juma.me.uk>
Also improved `LogValidatorTest` to cover a bug that was originally
only caught by `LogAppendTimeTest`.
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
This PR implements the follower fetch protocol as mentioned in KIP-405.
Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.
Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.
We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.
When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.
Introduced RemoteLogManager which is responsible for
initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.
You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Tirtha Chatterjee <tirtha.p.chatterjee@gmail.com>, Yaodong Yang <yangyaodong88@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in TestUtils which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from
this class can test Kafka in both ZK and KRaft mode. Test cases which do this can specify the
modes they support by including a ParameterizedTest annotation before each test case, like the
following:
@ParameterizedTest
@valuesource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = { ... }
For each value that is specified here (zk, kraft), the test case will be run once in the
appropriate mode. So the test shown above is run twice. This allows integration tests to be
incrementally converted over to support KRaft mode, rather than rewritten to support it. For
now, test cases which do not specify a quorum argument will continue to run only in ZK mode.
JUnit5 makes the quorum annotation visible in the TestInfo object which each @BeforEeach
function in a test can optionally take. Therefore, this PR converts over the setUp function of
the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo
object gets "passed up the chain" to the base class, where it determines which quorum type we
create (ZK or KRaft). In a few cases, I discovered test cases inheriting from the test harness
that had more than one @BeforeEach function. Because the JUnit5 framework does not define the
order in which @BeforeEach hooks are run, I changed these to overload setUp() instead, to avoid
undefined behavior.
The general approach taken here is to make as much as possible work with KRaft, but to leave some
things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient
object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly
request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode.
As a proof of concept, this PR converts over kafka.api.MetricsTest to support KRaft.
This PR also renames the quorum controller event handler thread to include the text
"QuorumControllerEventHandler". This allows QuorumTestHarness to check for hanging quorum
controller threads, as it does for hanging ZK-based controller threads.
Finally, ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
caused many failing test runs. Therefore, I disabled it here and filed KAFKA-13421 to fix the
test logic to be more reliable.
Reviewers: Jason Gustafson <jason@confluent.io>, Igor Soarez <soarez@apple.com>
Added snapshots for consumed remote log metadata for each partition to avoid consuming again in case of broker restarts. These snapshots are stored in the respective topic partition log directories.
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
1. It should not require a TopicPartition during construction and normal
usage.
2. Simplify `equals` since `topicId` and `topicPartition` are never
null.
3. Inline `Objects.hash` to avoid array allocation.
4. Make `toString` more concise using a similar approach as
`TopicPartition` since this `TopicIdPartition` will replace
`TopicPartition` in many places in the future.
5. Add unit tests for `TopicIdPartition`, it seems like we had none.
6. Minor clean-up in calling/called classes.
Reviewers: David Jacot <djacot@confluent.io>, Satish Duggana <satishd@apache.org>
Added asynchronous API support for RemoeLogMetadataManager add/update/put methods.
Implemented the changes on default topic based RemoteLogMetadataManager.
Refactored the respective tests to cover the introduced asynchronous APIs.
Reviewers: Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>