Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.
Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
* KAFKA-15107: Support custom metadata for remote log segment
This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.
On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
Implement some of the metrics from KIP-938: Add more metrics for
measuring KRaft performance.
Add these metrics to QuorumControllerMetrics:
kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
kafka.controller:type=KafkaController,name=NewActiveControllersCount
Create LoaderMetrics with these new metrics:
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Create SnapshotEmitterMetrics with these new metrics:
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
Reviewers: Ron Dagostino <rndgstn@gmail.com>
This patch does a few things:
1) It introduces the `OffsetAndMetadata` class which hold the committed offsets in the group coordinator.
2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of the OffsetCommit value record that should be used.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
This PR adds CommandDefaultOptions usage like in the other joptsimple based tools. It also moves the associated unit test class from streams to tools module as discussed in #13127 (comment)
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
This patch implements the existing JoinGroup protocol within the new group coordinator.
Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-14953: Adding RemoteLogManager metrics
In this PR, I have added the following metrics that are related to tiered storage mentioned in[ KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage).
|Metric|Description|
|-----------------------------------------|--------------------------------------------------------------|
| RemoteReadRequestsPerSec | Number of remote storage read requests per second |
| RemoteWriteRequestsPerSec | Number of remote storage write requests per second |
| RemoteBytesInPerSec | Number of bytes read from remote storage per second |
| RemoteReadErrorsPerSec | Number of remote storage read errors per second |
| RemoteBytesOutPerSec | Number of bytes copied to remote storage per second |
| RemoteWriteErrorsPerSec | Number of remote storage write errors per second |
| RemoteLogReaderTaskQueueSize | Number of remote storage read tasks pending for execution. |
| RemoteLogReaderAvgIdlePercent | Average idle percent of the remote storage reader thread pool|
| RemoteLogManagerTasksAvgIdlePercent | Average idle percent of RemoteLogManager thread pool |
Added unit tests for all the rate metrics.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Staniel Yao<yaolixinylx@gmail.com>, hudeqi<1217150961@qq.com>, Satish Duggana <satishd@apache.org>
This patch wires the new group coordinator in BrokerServer (KRaft only). With this, it is now possible to run a cluster with the new group coordinator and to use the ConsumerGroupHeartbeat API by specifying the following two properties:
- group.coordinator.new.enable = true (to enable the new group coordinator)
- unstable.api.versions.enable = true (to enable unreleased APIs)
Note that the new group coordinator does not support all the existing APIs yet.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
KAFKA-14522 Rewrite and Move of RemoteIndexCache to storage module.
Cleanedup index file suffix usages and other minor cleanups
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
This patch rewrites MockTimer in Java and moves it from core to server-common. This continues the work started in https://github.com/apache/kafka/pull/13820.
Reviewers: Divij Vaidya <diviv@amazon.com>
This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Poison the transaction manager if we detect an illegal transition in the Sender thread. A ThreadLocal in is stored in TransactionManager so that the Sender can inform TransactionManager which thread it's using.
Reviewers: Daniel Urban <durban@cloudera.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
This patch introduces the GroupCoordinatorService. This is the new (incomplete) implementation of the group coordinator based on the coordinator runtime introduced in https://github.com/apache/kafka/pull/13795.
Reviewers: Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>
The Java rewrite is kept relatively close to the Scala original
to minimize potential newly introduced bugs and to make reviewing
simpler. The following details might be of note:
- The `Logging` trait moved to InterBrokerSendThread with the
rewrite of ShutdownableThread has been similarly moved to any
subclasses that currently use it. InterBrokerSendThread's own
logging has been made to use ShutdownableThread's logger which
mimics the prefix/log identifier that the trait provided.
- The case RequestAndCompletionHandler class has been made a
separate POJO class and the internal-use UnsentRequests class
has been kept as a static nested class.
- The relatively commonly used but internal (not part of the
public API) clients classes that InterBrokerSendThread relies on
have been allowlisted in the server-common import control.
- The accompanying test class has also been moved and rewritten
with one new test added and most of the pre-existing tests made
stricter.
Reviewers: David Jacot <djacot@confluent.io>
This PR expands the scope of ApiVersionManager a bit to include returning the current
MetadataVersion and features that are in effect. This is useful in general because that information
needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager
interface so that all subclasses implement all methods of the interface. Having subclasses that
don't implement some methods is dangerous because they could cause exceptions at runtime in
unexpected scenarios.
On the KRaft controller, we were previously performing a read operation in the QuorumController
thread to get the current metadata version and features. With this PR, we now read a volatile
variable maintained by a separate MetadataVersionContextPublisher object. This will improve
performance and simplify the code. It should not change the guarantees we are providing; in both
the old and new scenarios, we need to be robust against version skew scenarios during updates.
Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch.
Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class.
(There are some additional feature-related classes that can be consolidated in in a follow-on PR.)
Create a java class, EndpointReadyFutures, for managing the futures associated with individual
authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and
makes this code unit-testable.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
This patch rewrite `MockTime` in Java and moves it to `server-common` module. This is a prerequisite to move `MockTimer` later on to `server-common` as well.
Reviewers: David Arthur <mumrah@gmail.com>
The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, validate and reset positions. For the new KafkaConsumer with a refactored threading model, similar functionality will be needed.
This is an initial refactoring for extracting logic from the OffsetFetcher, that will be reused by the new consumer implementation. No changes to the existing logic, just extracting classes, functions or pieces of logic.
All the functionality moved out of the OffsetFetcher is already covered by tests in OffsetFetcherTest and FetcherTest. There were no individual tests for the extracted functions, so no tests were migrated.
Reviewers: Jun Rao <junrao@gmail.com>
Adds CoordinatorEvent, CoordinatorEventProcessor, and MultiThreadedEventProcessor.
Reviewers: Kirk True <ktrue@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.
The new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records are applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this patch. It will come in a following one.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.
(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)
With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover. We also now
fail with a PolicyViolationException rather than an UnknownServerException.
In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.
Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ijuma@apache.org>, Divij Vaidya <diviv@amazon.com>
Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, and so forth contain
numerous sub-images. This PR adds a structured way of traversing those sub-images. This is useful
for the metadata shell, and also for implementing toString functions.
In both cases, the previous solution was suboptimal. The metadata shell was previously implemented
in an ad-hoc way by mutating text-based tree nodes when records were replayed. This was difficult
to keep in sync with changes to the record types (for example, we forgot to do this for SCRAM). It
was also pretty low-level, being done at a level below that of the image classes. For toString, it
was difficult to keep the implementations consistent previously, and also support both redacted and
non-redacted output.
The metadata shell directory was getting crowded since we never had submodules for it. This PR
creates glob/, command/, node/, and state/ directories to keep things better organized.
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
This change includes
- Recognize the fetch requests with out of range local log offsets
- Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset]
- Add a new purgatory for async remote read requests which are served through a specific thread pool
We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR.
A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs.
Other contributors:
Kamal Chandraprakash <kchandraprakash@uber.com> - Further improvements and adding a few tests
Luke Chen <showuon@gmail.com> - Added a few test cases for these changes.
PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those.
Reviewers: Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, Divij Vaidya <diviv@amazon.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
This patch introduces `GenericGroup` which rewrite the `GroupMetadata` in Java. The `GenericGroup` is basically a group using the current rebalance protocol in the new group coordinator.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
This patch adds support for handling metadata snapshots while in dual-write mode. Prior to this change, if the active
controller loaded a snapshot, it would get out of sync with the ZK state.
In order to reconcile the snapshot state with ZK, several methods were added to scan through the metadata in ZK to
compute differences with the MetadataImage. Since this introduced a lot of code, I opted to split out a lot of methods
from ZkMigrationClient into their own client interfaces, such as TopicMigrationClient, ConfigMigrationClient, and
AclMigrationClient. Each of these has some iterator method that lets the caller examine the ZK state in a single pass
and without using too much memory.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Luke Chen <showuon@gmail.com>
Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.
Reviewers: David Arthur <mumrah@gmail.com>
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
This patch adds the concept of pre-migration mode to the KRaft controller. While in this mode,
the controller will only allow certain write operations. The purpose of this is to disallow metadata
changes when the controller is waiting for the ZK migration records to be committed.
The following ControllerWriteEvent operations are permitted in pre-migration mode
* completeActivation
* maybeFenceReplicas
* writeNoOpRecord
* processBrokerHeartbeat
* registerBroker (only for migrating ZK brokers)
* unregisterBroker
Raft events and other controller events do not follow the same code path as ControllerWriteEvent,
so they are not affected by this new behavior.
This patch also add a new metric as defined in KIP-868: kafka.controller:type=KafkaController,name=ZkMigrationState
In order to support upgrades from 3.4.0, this patch also redefines the enum value of value 1 to mean
MIGRATION rather than PRE_MIGRATION.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch adds ConsumerGroupMember.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
This patch renames from `ControllerPurgatory` to `DeferredEventQueue` and moves it from the `metadata` module to `server-common` module.
Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Rework UserScramCredentialRecord to store serverKey and StoredKey rather than saltedPassword. This
is necessary to support migration from ZK, since those are the fields we stored in ZK. Update
latest MetadataVersion to IBP_3_5_IV2 and make SCRAM support conditional on this version. Moved
ScramCredentialData.java from org.apache.kafka.image to org.apache.kafka.metadata, which seems more
appropriate.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
topic counts.
Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change. Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events. We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Purshotam Chauhan <pchauhan@confluent.io>
This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR updates primary-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
Reviewers: Matthias J. Sax <matthias@confluent.io>