This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support periodic
cleaning of old log segments and snapshots.
Four new public config keys are introduced: metadata.log.segment.bytes,
metadata.log.segment.ms, metadata.max.retention.bytes, and
metadata.max.retention.ms.
These are used to configure the log layer as well as the snapshot cleaning logic. Snapshot
and log cleaning is performed based on two criteria: total metadata log + snapshot size
(metadata.max.retention.bytes), and max age of a snapshot (metadata.max.retention.ms).
Since we have a requirement that the log start offset must always align with a snapshot,
we perform the cleaning on snapshots first and then clean what logs we can.
The cleaning algorithm follows:
1. Delete the oldest snapshot.
2. Advance the log start offset to the new oldest snapshot.
3. Request that the log layer clean any segments prior to the new log start offset
4. Repeat this until the retention size or time is no longer violated, or only a single
snapshot remains.
The cleaning process is triggered every 60 seconds from the KafkaRaftClient polling
thread.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, dengziming <dengziming1993@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Updated documentation for connector restart REST API to include the tasks restart behavior, including calling out that by default the same behavior of previous versions is preserved.
Author: Kalpesh Patel <kpatel@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This patch ensures that `maxTimestampSoFar` and `offsetOfMaxTimestampSoFar` are consistent with each others. It does so by storing them together. It relates to KIP-734 which exposes them via the admin client.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
* Improve the test prior to reimplementing KafkaFutureImpl using CompletableFuture.
* KAFKA-9687: Reimplement KafkaFutureImpl using a CompleteableFuture
* KIP-707: Add KafkaFuture.toCompletionStage
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
KIP-699 added support for batching in FindCoordinatorRequest using a new protocol that changes the wire format for both batched and unbatched requests. Clients were updated to try the new format first and switch irreversibly to the old format if the new format is not supported on one broker. During rolling upgrade (or a downgrade), it is possible that a broker doesn't support new format at some point while other brokers do at a later point. Clients end up in a bad state until restarted since they use new version with old format. This PR changes FindCoordinatorRequest to set data based on actual version when a single group is used. This is always the case for consumer coordinator and transaction manager. For admin API, we still switch to unbatched mode on failure, but the data is set based on actual version, so we never fail even if brokers are upgraded/downgraded.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Gradle 7.1 improves Java incremental compilation:
https://docs.gradle.org/7.1.1/release-notes.html
We previously kept the JDK 15 build because some
tests didn't work with JDK 16. Since then, a number
of PRs were submitted to fix this so it's best
to remove the JDK 15 build before we create the
3.0 release branch.
Finally bump `test-retry` gradle plugin version too.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Luke Chen <showuon@gmail.com>
Track handleSnapshot calls and make sure it is never triggered on the leader node.
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Boyang Chen <bchen11@outlook.com>
We changed the default serde in Streams to be null in #10813, but forgot to add some in tests, for example TestTopicsTest and TopologyTestDriverTest.
Reviewers: David Jacot <djacot@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Add the record append time to Batch. Change SnapshotReader to set this time to the
time of the last log in the last batch. Fix the QuorumController to remember the last
committed batch append time and to store it in the generated snapshot.
Reviewers: David Arthur <mumrah@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
The fix to avoid spurious left/outer stream-stream join results, showed
very low throughput for RocksDB, due to excessive creation of iterators.
Instead of trying to emit left/outer stream-stream join result for every
input record, this PR adds tracking of the lower timestamp bound of
left/outer join candidates, and only tries to emit them (and create an
iterator) if they are potentially old enough.
Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <guozhang@confluent.io>, Sergio Peña <sergio@confluent.io>
Segment and index files are currently renamed with a .deleted
suffix prior to async deletion. This serves two purposes, to
resume deletion on broker failure and also protect against
deletion of new segments during truncation (due to deletion
being async).
We should do the same for snapshot files. While they are not subject
to issues around resuming deletion due to the stray snapshot
scanning which is performed on log initialization, we can end up
with situations where truncation queues snapshots for deletion, but
prior to deletion new segments with the same snapshot file name are
created. Async deletion can then delete these new snapshots.
This patch offers a two-stage snapshot deletion which first renames
and removes the segments in question from the ProducerStateManager,
allowing the Log to asynchronously delete them.
Credit to Kowshik Prakasam <kowshik@gmail.com> for finding this issue
and creating the test demonstrating the failure.
Co-authored-by: Kowshik Prakasam <kowshik@gmail.com> Address PR feedback
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
In getListOffsetsCalls, we rebuild the cluster snapshot for every topic partition. instead, we should reuse a snapshot.
For manual testing (used AK 2.8), i've passed in a map of 6K topic partitions to listOffsets
Without snapshot reuse:
duration of building futures from metadata response: **15582** milliseconds
total duration of listOffsets: **15743** milliseconds
With reuse:
duration of building futures from metadata response: **24** milliseconds
total duration of listOffsets: **235** milliseconds
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This implements KIP-699: https://cwiki.apache.org/confluence/display/KAFKA/KIP-699%3A+Update+FindCoordinator+to+resolve+multiple+Coordinators+at+a+time
It updates FindCoordinator request and response to support resolving multiple coordinators at a time. If a broker does not support the new FindCoordinator version, clients can revert to the previous behaviour and use a request for each coordinator.
Reviewers: David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>, Sanjana Kaundinya <skaundinya@gmail.com>
This PR relies on existing tests. A subsequent PR will make additional test adjustments
to ensure coverage of the non-default behavior is still good after this change.
Reviewers: Cheng Tan <31675100+d8tltanc@users.noreply.github.com>, Ismael Juma <ismael@juma.me.uk>
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy".
This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously.
When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Ryanne Dolan <rdolan@twitter.com>
Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com>
Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
Create the image/ module for storing, reading, and writing broker metadata images.
Metadata images are immutable. New images are produced from existing images
using delta classes. Delta classes are mutable, and represent changes to a base
image.
MetadataImage objects can be converted to lists of KRaft metadata records. This
is essentially writing a KRaft snapshot. The resulting snapshot can be read
back into a MetadataDelta object. In practice, we will typically read the
snapshot, and then read a few more records to get fully up to date. After that,
the MetadataDelta can be converted to a MetadataImage as usual.
Sometimes, we have to load a snapshot even though we already have an existing
non-empty MetadataImage. We would do this if the broker fell too far behind and
needed to receive a snapshot to catch up. This is handled just like the normal
snapshot loading process. Anything that is not in the snapshot will be marked
as deleted in the MetadataDelta once finishSnapshot() is called.
In addition to being used for reading and writing snapshots, MetadataImage also
serves as a cache for broker information in memory. A follow-up PR will replace
MetadataCache, CachedConfigRepository, and the client quotas cache with the
corresponding Image classes. TopicsDelta also replaces the "deferred
partition" state that the RaftReplicaManager currently implements. (That change
is also in a follow-up PR.)
Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This patch fixes a regression introduced https://github.com/apache/kafka/pull/10760. The downgrade logic was not downgrading the version when only non MAX_TIMESTAMP specs were used.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds two new apis to support topic deletion using topic IDs or names. It uses a new class `TopicCollection` to keep a collection of topics defined either by names or IDs. Finally, it modifies `DeleteTopicsResult` to support both names and IDs and deprecates the old methods which have become ambiguous. Eventually we will want to deprecate the old `deleteTopics` apis as well, but this patch does not do so.
Reviewers: Jason Gustafson <jason@confluent.io>
Implements KIP-745 https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks to change connector REST API to restart a connector and its tasks as a whole.
Testing strategy
- [x] Unit tests added for all possible combinations of onlyFailed and includeTasks
- [x] Integration tests added for all possible combinations of onlyFailed and includeTasks
- [x] System tests for happy path
Reviewers: Randall Hauch <rhauch@gmail.com>, Diego Erdody <erdody@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
Add the ability for KRaft controllers to generate snapshots based on the number of new record bytes that have
been applied since the last snapshot. Add a new configuration key to control this parameter. For now, it
defaults to being off, although we will change that in a follow-on PR. Also, fix LocalLogManager so that
snapshot loading is only triggered when the listener is not the leader.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Implements KIP-633.
Grace-period is an important parameter and its best to make it the user's responsibility to set it expliclity. Thus, we move off to provide a default and make it a mandatory parameter when creating a window.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Luke Chen <showuon@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Fix the JavaDoc for the ClientQuotaManagerConfig#throttle function to
refer to the correct parameter name.
BrokerEndPointTest#testHashAndEquals should test the BrokerEndPoint
class, rather than the MetadataBroker class.
TopicConfigHandler: make the kafkaController argument optional, since we won't
have it when in KRaft mode.
Remove the unecessary ConfigRepository argument for the Partition class.
Remove the unused TestUtils#deleteBrokersInZk function.
Reviewers: Jason Gustafson <jason@confluent.io>
Update the ZooKeeper version to v3.6.3. This requires adding dropwizard
as a new dependency.
Also, add Kafka v2.8.0 to the ducktape system test image.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
The integration test TaskMetadataIntegrationTest will increase
the length of the app ID when its test methods are called multiple
times in one execution. This is for example the case if you
repeatedly run the test until failure in IntelliJ IDEA. This might
also lead to exceptions because the state directory depends on the
app ID and directory names have a length limit.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Remove getNonExistingTopics, which was not necessary. MetadataCache
already lets callers check for the existence of topics by calling
MetadataCache#contains.
Add MetadataCache#getAliveBrokerNode and getAliveBrokerNodes. This
simplifies the calling code, which always wants a Node.
Fix a case where we were calling getAliveBrokers and filtering by id,
rather than simply calling getAliveBroker(id) and making use of the hash
map.
Reviewers: Jason Gustafson <jason@confluent.io>, Jose Sancio <jsancio@gmail.com>
This commit implements KIP-716. It introduces a new setting `offset-syncs.topic.location` that allows specifying where the offset-syncs topic is created.
Reviewers: Tom Bentley <tbentley@redhat.com>, Edoardo Comar <ecomar@uk.ibm.com>
Add header and footer records for raft snapshots. This helps identify when the snapshot
starts and ends. The header also contains a time. The time field is currently set to 0.
KAFKA-12997 will add in the necessary wiring to use the correct timestamp.
Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.
One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.
This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows.
If there are any .cleaned files, we delete all .swap files with higher/equal offsets due to KAFKA-6264. We also delete the .cleaned files. If no .cleaned file, do nothing for this step.
If there are any .log.swap files left after step 1, they, together with their index files, must be renamed from .cleaned and are complete (renaming from .cleaned to .swap is in reverse offset order). We rename these .log.swap files and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted.
Do log splitting for legacy log segments with offset overflow (KAFKA-6264)
If there are any other index swap files left, they must come from partial renaming from .swap files to regular files. We can simply rename them to regular files.
credit: some code is copied from @dhruvilshah3 's PR: #10388
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
If fetchOffset < startOffset, we currently throw OffsetOutOfRangeException when attempting to read from the log in the regular case. But for diverging epochs, we return Errors.NONE with the new leader start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException when processing responses with Errors.NONE if the leader's offsets in the response are out of range and this moves the partition to failed state. The PR adds a check for this case when processing fetch requests and throws OffsetOutOfRangeException regardless of epoch.
Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Bhatia <rite2nikhil@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This patch fixes the `ConsumerGroupCommand` to correctly handle missing offsets, which are returned as `null` by the admin API.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds an implementation of the `resign()` API which allows the controller to proactively resign leadership in case it encounters an unrecoverable situation. There was not a lot to do here because we already supported a `Resigned` state to facilitate graceful shutdown.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
Session windows should not be close directly when "window end" time is reached, but "window close" time should be "window-end + gap + grace-period".
Reviewer: Matthias J. Sax <matthias@confluent.io>
Removed the condition to throw the error. Now we return UNKNOWN_TOPIC_ID which allows clients to retry instead of failing. Updated the test for IBP < 2.8 that tries to delete topics using ID.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
When refer to the return "Check whether the last offset of the last batch in this segment overflows the indexes", if the result is not expected, the path of the segment should be printed so that users can find problems.
Reviewers: Luke Chen, Guozhang Wang <wangguoz@gmail.com>