Commit Graph

168 Commits

Author SHA1 Message Date
Apoorv Mittal f2dbc55d24
KAFKA-17047: Refactored group coordinator classes to modern package (KIP-932) (#16474)
Following the discussion and suggestion by @dajac, https://github.com/apache/kafka/pull/16054#discussion_r1613638293, the PR refactors the common classes to build TargetAssignment in `modern` package. `consumer` package has been moved inside `modern` package with classes exclusive to `consumer group`.

This PR completes the refactoring and base to introduce `share` package inside `modern`. The subsequent PRs will define the implementation specific to Share Groups while re-using the common functionality from `modern` package classes. 

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-07-03 00:16:40 -07:00
Apoorv Mittal 60114a46a7
KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) (#16054)
Abstracted code for 2 classes `ConsumerGroup` and `ConsumerGroupMember` to `ModernGroup` and `ModernGroupMember` respectively. The new abstract classes are created to share common functionality with `ShareGroup` and `ShareGroupMember` which are being introduced with KIP-932.

The patch is majorly code refactoring from existing classes to abstract classes. Also created a new package called `modern` where `MemberState` class is moved, in upcoming patches, I will move common classes for `Share` and `Consumer` Group in `modern` package itself. 

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
2024-06-27 05:42:58 -07:00
Kuan-Po (Cooper) Tseng 888a177603
KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH (#16231)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-14 16:47:34 +08:00
gongxuanzhang 596b945072
KAFKA-16643 Add ModifierOrder checkstyle rule (#15890)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-13 15:39:32 +08:00
gongxuanzhang 46eb0814f6
KAFKA-10787 Apply spotless to log4j-appender, trogdor, jmh-benchmarks, examples, shell and generator (#16296)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-12 22:23:39 +08:00
David Jacot 049cfeac02
MINOR: Rename uniform assignor's internal builders (#16233)
This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-10 05:26:56 -07:00
David Jacot 7d832cf74f
KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module (#16198)
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-06 12:19:20 -07:00
Ritika Reddy 078dd9a311
KAFKA-16821; Member Subscription Spec Interface (#16068)
This patch reworks the `PartitionAssignor` interface to use interfaces instead of POJOs. It mainly introduces the `MemberSubscriptionSpec` interface that represents a member subscription and changes the `GroupSpec` interfaces to expose the subscriptions and the assignments via different methods.

The patch does not change the performance.

before:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.462 ± 0.687  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.626 ± 0.412  ms/op
JMH benchmarks done
```

after:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.677 ± 0.683  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.991 ± 0.065  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <djacot@confluent.io>
2024-06-04 06:44:37 -07:00
David Jacot fb566e48bf
KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor.

Trunk:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  24.535 ± 1.583  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  24.094 ± 0.223  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  14.697 ± 0.133  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  15.073 ± 0.135  ms/op
JMH benchmarks done
```

Patch:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.376 ± 0.577  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.731 ± 0.359  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  1.975 ± 0.086  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  2.026 ± 0.190  ms/op
JMH benchmarks done
```

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-05-31 13:17:59 -07:00
Mickael Maison 8068a086a3
MINOR: Remove KafkaConfig dependency in KafkaRequestHandler (#16108)
Reviewers: Luke Chen <showuon@gmail.com>, Apoorv Mittal <amittal@confluent.io>
2024-05-30 11:51:24 +02:00
Calvin Liu c8af740bd4
Improve producer ID expiration performance (#16075)
Skip using stream when expiring the producer ID. This can improve the performance significantly when the count is high.
Before

Benchmark                                        (numProducerIds)  Mode  Cnt      Score       Error  Units
ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    3    101.253 ±    28.031  us/op
ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    3   2297.219 ±  1690.486  us/op
ProducerStateManagerBench.testDeleteExpiringIds           1000000  avgt    3  30688.865 ± 16348.768  us/op
After

Benchmark                                        (numProducerIds)  Mode  Cnt     Score     Error  Units
ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    3    39.122 ±   1.151  us/op
ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    3   464.363 ±  98.857  us/op
ProducerStateManagerBench.testDeleteExpiringIds           1000000  avgt    3  5731.169 ± 674.380  us/op
Also, made a change to the JMH testing which excludes the producer ID populating from the testing.

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-05-29 16:49:55 -07:00
Justine Olshan 5e3df22095
KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool (#15685)
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
2024-05-29 16:36:06 -07:00
Omnia Ibrahim 64f699aeea
KAFKA-15853: Move general configs out of KafkaConfig (#16040)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-28 16:22:54 +02:00
Ritika Reddy a8d166c00e
KAFKA-16625; Reverse lookup map from topic partitions to members (#15974)
This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-05-25 09:06:15 -07:00
Colin P. McCabe 4f55786a8a KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers
ZkMetadataCache could theoretically return KRaft controller information from a call to
ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the
set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider
during ZK migration, where it allowed ZK brokers in migration mode to forward requests to
kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply
delegate to quorumControllerNodeProvider in this case.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-05-24 10:16:59 -07:00
Jeff Kim 520aa8665c
KAFKA-16626; Lazily convert subscribed topic names to topic ids (#15970)
This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-24 00:51:50 -07:00
Greg Harris 11ad5e8bca
MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions (#15469)
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2024-05-23 13:23:18 -07:00
Mickael Maison affe8da54c
KAFKA-7632: Support Compression Levels (KIP-390) (#15516)
Reviewers: Jun Rao <jun@confluent.io>,  Luke Chen <showuon@gmail.com>
Co-authored-by: Lee Dongjin <dongjin@apache.org>
2024-05-21 17:58:49 +02:00
David Jacot 1e427c029e
MINOR: Fix TargetAssignmentBuilderBenchmark (#15950)
Reviewers:  Jeff Kim <kimkb2011@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-15 12:53:17 +08:00
Ritika Reddy ee16eee5de
KAFKA-16587: Add subscription model information to group state (#15785)
This patch introduces the SubscriptionType to the group state and passes it along to the partition assignor. A group is "homogeneous" when all the members are subscribed to the same topics; or it is "heterogeneous" otherwise. This mainly helps the uniform assignor because it does not have to re-compute this information to determine which algorithm to use.

trunk:
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionModel)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10          HOMOGENEOUS           100  avgt    5    0.136 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10          HOMOGENEOUS          1000  avgt    5    0.198 ±  0.002  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10          HOMOGENEOUS           100  avgt    5    1.767 ±  0.138  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10          HOMOGENEOUS          1000  avgt    5    1.540 ±  0.020  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10          HOMOGENEOUS           100  avgt    5   32.419 ±  7.173  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10          HOMOGENEOUS          1000  avgt    5   26.731 ±  1.985  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10          HOMOGENEOUS           100  avgt    5    0.242 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10          HOMOGENEOUS          1000  avgt    5    1.002 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10          HOMOGENEOUS           100  avgt    5    2.544 ±  0.168  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10          HOMOGENEOUS          1000  avgt    5   10.749 ±  0.207  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10          HOMOGENEOUS           100  avgt    5   26.832 ±  0.154  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10          HOMOGENEOUS          1000  avgt    5  106.209 ±  0.301  ms/op
JMH benchmarks done

patch:
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5   0.131 ± 0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5   0.185 ± 0.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5   1.943 ± 0.091  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5   1.450 ± 0.139  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5  30.803 ± 2.644  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5  24.251 ± 1.230  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10         HOMOGENEOUS           100  avgt    5   0.155 ± 0.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10         HOMOGENEOUS          1000  avgt    5   0.235 ± 0.010  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10         HOMOGENEOUS           100  avgt    5   1.602 ± 0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10         HOMOGENEOUS          1000  avgt    5   1.901 ± 0.174  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  16.098 ± 1.905  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  17.681 ± 0.174  ms/op
JMH benchmarks done

Reviewers: David Jacot <djacot@confluent.io>
2024-05-13 02:19:05 -07:00
David Arthur fe8ccbc92c
KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration (#15744)
This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this.

*Forwarding*

During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail.

With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft.

*Protected ZK Writes*

As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates.

Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK after the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller.

The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs.

Reviewers:  Luke Chen <showuon@gmail.com>, Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-07 08:29:57 +08:00
Omnia Ibrahim d88c15fc3e
KAFKA-15853 Move KRAFT configs out of KafkaConfig (#15775)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-27 07:02:31 +08:00
Ritika Reddy 8013657f5d
KAFKA-16568: JMH Benchmarks for Server Side Rebalances (#15717)
This patch add three benchmarks for the client assignors, the server assignors and the target assignment builder.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-25 07:46:45 -07:00
Kuan-Po (Cooper) Tseng ced79ee12f
KAFKA-16552 Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests (#15719)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-04-20 20:34:02 +08:00
Omnia Ibrahim e798bed198
KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically (#15335)
This pr fixes the bug created by #15263 which caused topic partition to be recreated whenever the original log dir is offline: Log directory failure re-creates partitions in another logdir automatically

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>, Gaurav Narula <gaurav_narula2@apple.com>, Proven Provenzano <pprovenzano@confluent.io>
2024-04-06 14:36:26 +08:00
Nikolay d8673b26bf
KAFKA-15899 [1/2] Move kafka.security package from core to server module (#15572)
1) This PR moves kafka.security classes from core to server module.
2) AclAuthorizer not moved, because it has heavy dependencies on core classes that not rewrited from scala at the moment.
3) AclAuthorizer will be deleted as part of ZK removal

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-30 11:54:22 +08:00
Nikolay 355873aa54
MINOR: Use CONFIG suffix in ZkConfigs (#15614)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Co-authored-by: n.izhikov <n.izhikov@vk.team>
2024-03-28 15:52:34 +01:00
Nikolay 6f38fe5e0a
KAFKA-14588 ZK configuration moved to ZkConfig (#15075)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-27 22:37:01 +08:00
Jorge Esteban Quilcate Otoya b25c96a915
KAFKA-16229: Fix slow expired producer id deletion (#15324)
Expiration of ProducerIds is implemented with a slow removal of map keys:
        producers.keySet().removeAll(keys);
Unnecessarily going through all producer ids and then throw all expired keys to be removed.
This leads to exponential time on worst case when most/all keys need to be removed:

Benchmark                                        (numProducerIds)  Mode  Cnt           Score            Error  Units
ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    3        9164.043 ±      10647.877  ns/op
ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    3      341561.093 ±      20283.211  ns/op
ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    3    44957983.550 ±    9389011.290  ns/op
ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    3  5683374164.167 ± 1446242131.466  ns/op
A simple fix is to use map#remove(key) instead, leading to a more linear growth:

Benchmark                                        (numProducerIds)  Mode  Cnt        Score         Error  Units
ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    3     5779.056 ±     651.389  ns/op
ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    3    61430.530 ±   21875.644  ns/op
ProducerStateManagerBench.testDeleteExpiringIds             10000  avgt    3   643887.031 ±  600475.302  ns/op
ProducerStateManagerBench.testDeleteExpiringIds            100000  avgt    3  7741689.539 ± 3218317.079  ns/op
Flamegraph of the CPU usage at dealing with expiration when producers ids ~1Million:

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-02-09 17:17:17 -08:00
David Arthur 7bf7fd99a5
KAFKA-16078: Be more consistent about getting the latest MetadataVersion
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
2024-01-17 14:59:22 -08:00
Divij Vaidya 65424ab484
MINOR: New year code cleanup - include final keyword (#15072)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Sagar Rao <sagarmeansocean@gmail.com>
2024-01-11 17:53:35 +01:00
Fiore Mario Vitale 314de9f23c
KAFKA-15996: Improve JsonConverter performance (#14992)
Improve JsonConverter performance by using afterBurnModule of Jackson library.

Reviewers: Divij Vaidya <diviv@amazon.com>, Mickael Maison <mickael.maison@gmail.com>
2023-12-24 21:47:12 +01:00
Omnia Ibrahim 07490b929b
KAFKA-15365: Broker-side replica management changes (#14881)
Reviewers: Igor Soarez <soarez@apple.com>, Ron Dagostino <rndgstn@gmail.com>, Proven Provenzano <pprovenzano@confluent.io>
2023-12-11 09:34:22 -05:00
David Arthur a8622faf47
KAFKA-15799 Handle full metadata updates on ZK brokers (#14719)
This patch adds the concept of a "Full" UpdateMetadataRequest, similar to what is used in
LeaderAndIsr. A new tagged field is added to UpdateMetadataRequest at version 8 which allows the
KRaft controller to indicate if a UMR contains all the metadata or not. Since UMR is implicitly
treated as incremental by the ZK broker, we needed a way to detect topic deletions when the KRaft
broker sends a metadata snapshot to the ZK broker. By sending a "Full" flag, the broker can now
compare existing topic IDs to incoming topic IDs and calculate which topics should be removed from
the MetadataCache.

This patch only removes deleted topics from the MetadataCache. Partition/log management was
implemented in KAFKA-15605.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-11-16 14:38:44 -08:00
hudeqi 9911fab1a1
KAFKA-15432: RLM Stop partitions should not be invoked for non-tiered storage topics (#14667)
Reviewers: Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
2023-11-02 10:00:15 +01:00
Calvin Liu 14029e2ddd
KAFKA-15582: Identify clean shutdown broker (#14465)
The PR includes:

* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.

Reviewers: Jun Rao <junrao@gmail.com>
2023-10-19 10:25:23 -07:00
Matthias J. Sax 9b468fb278
MINOR: Do not end Javadoc comments with `**/` (#14540)
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bill@confluent.io>, Hao Li <hli@confluent.io>, Josep Prat <josep.prat@aiven.io>
2023-10-17 21:11:04 -07:00
Mehari Beyene 25b128de81
KAFKA-14991: KIP-937-Improve message timestamp validation (#14135)
This implementation introduces two new configurations `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms` and deprecates `log.message.timestamp.difference.max.ms`.

The default value for all these three configs is maintained to be Long.MAX_VALUE for backward compatibility but with the newly added configurations we can have a finer control when validating message timestamps that are in the past and the future compared to the broker's timestamp.

To maintain backward compatibility if the default value of `log.message.timestamp.before.max.ms` is not changed, we are assuming users are still using the deprecated config `log.message.timestamp.difference.max.ms` and validation is done using its value. This ensures that existing customers who have customized the value of `log.message.timestamp.difference.max.ms` will continue to see no change in behavior.

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
2023-08-24 12:04:55 +02:00
Luke Chen 748175ce62
KAFKA-15189: only init remote topic metrics when enabled (#14133)
Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.

Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
2023-08-05 13:00:16 +08:00
Luke Chen 27ea025e33
KAFKA-15176: add tests for tiered storage metrics (#13999)
Added tests for metrics:
1. RemoteLogReaderTaskQueueSize
2. RemoteLogReaderAvgIdlePercent
3. RemoteLogManagerTasksAvgIdlePercent

Also, added tests for OffsetOutOfRangeException will be thrown while reading logs

Reviewers: Christo Lolov <christololov@gmail.com>, Satish Duggana <satishd@apache.org>
2023-07-21 10:30:33 +08:00
Justine Olshan ea0bb00126
KAFKA-14884: Include check transaction is still ongoing right before append (take 2) (#13787)
Introduced extra mapping to track verification state.

When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction.

We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.

We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.

There were a few other fixes:
- Moved the transaction manager handling for failed batch into the future completed exceptionally block to avoid processing it twice (this caused issues in unit tests)
- handle interrupted exceptions encountered when callback thread encountered them
- change handling to throw error if we try to set verification state and leaderLogIfLocal is None.

Reviewers: David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
2023-07-14 15:18:11 -07:00
Colin P. McCabe cd3c0ab1a3 KAFKA-15060: fix the ApiVersionManager interface
This PR expands the scope of ApiVersionManager a bit to include returning the current
MetadataVersion and features that are in effect. This is useful in general because that information
needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager
interface so that all subclasses implement all methods of the interface. Having subclasses that
don't implement some methods is dangerous because they could cause exceptions at runtime in
unexpected scenarios.

On the KRaft controller, we were previously performing a read operation in the QuorumController
thread to get the current metadata version and features. With this PR, we now read a volatile
variable maintained by a separate MetadataVersionContextPublisher object. This will improve
performance and simplify the code. It should not change the guarantees we are providing; in both
the old and new scenarios, we need to be robust against version skew scenarios during updates.

Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch.
Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class.
(There are some additional feature-related classes that can be consolidated in in a follow-on PR.)

Create a java class, EndpointReadyFutures, for managing the futures associated with individual
authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and
makes this code unit-testable.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
2023-06-19 16:46:44 -07:00
David Jacot 7eea2a3908
MINOR: Move MockTime to server-common (#13823)
This patch rewrite `MockTime` in Java and moves it to `server-common` module. This is a prerequisite to move `MockTimer` later on to `server-common` as well. 

Reviewers: David Arthur <mumrah@gmail.com>
2023-06-09 08:54:25 +02:00
Divij Vaidya fe6a827e20
KAFKA-14633: Reduce data copy & buffer allocation during decompression (#13135)
After this change,

    For broker side decompression: JMH benchmark RecordBatchIterationBenchmark demonstrates 20-70% improvement in throughput (see results for RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize).
    For consumer side decompression: JMH benchmark RecordBatchIterationBenchmark a mix bag of single digit regression for some compression type to 10-50% improvement for Zstd (see results for RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize).

Reviewers: Luke Chen <showuon@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Ismael Juma <mail@ismaeljuma.com>
2023-06-05 15:04:49 +08:00
Yash Mayya 9bb2f78d53
KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark (#13776)
Reviewers: Chris Egerton <chrise@aiven.io>
2023-06-01 15:14:31 -04:00
Justine Olshan 9edf2ec5cc
MINOR: Add transaction verification config to producerStateManager config (#13770)
I have moved this config into producer state manager so it can be checked easily under the log lock when we are about to append.

Only a few test files currently use the validation and those have been verified to work via running the tests.

Reviews:  David Jacot <djacot@confluent.io>
2023-05-30 13:46:17 -07:00
Divij Vaidya 6bcc497c36
KAFKA-14766: Improve performance of VarInt encoding and decoding (#13312)
Motivation

Reading/writing the protocol buffer varInt32 and varInt64 (also called varLong in our code base) is in the hot path of data plane code in Apache Kafka. We read multiple varInt in a record and in long. Hence, even a minor change in performance could extrapolate to larger performance benefit.

In this PR, we only update varInt32 encoding/decoding.
Changes

This change uses loop unrolling and reduces the amount of repetition of calculations. Based on the empirical results from the benchmark, the code has been modified to pick up the best implementation.
Results

Performance has been evaluated using JMH benchmarks on JDK 17.0.6. Various implementations have been added in the benchmark and benchmarking has been done for different sizes of varints and varlongs. The benchmark for various implementations have been added at ByteUtilsBenchmark.java

Reviewers: Ismael Juma <mlists@juma.me.uk>, Luke Chen <showuon@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
2023-05-05 20:05:20 +08:00
Luke Chen d796480fe8
KAFKA-14909: check zkMigrationReady tag before migration (#13631)
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2023-04-28 14:35:12 +08:00
Purshotam Chauhan df13775254
KAFKA-14828: Remove R/W locks using persistent data structures (#13437)
Currently, StandardAuthorizer uses a R/W lock for maintaining the consistency of data. For the clusters with very high traffic, we will typically see an increase in latencies whenever a write operation comes. The intent of this PR is to get rid of the R/W lock with the help of immutable or persistent collections. Basically, new object references are used to hold the intermediate state of the write operation. After the completion of the operation, the main reference to the cache is changed to point to the new object. Also, for the read operation, the code is changed such that all accesses to the cache for a single read operation are done to a particular cache object only.

In the PR description, you can find the performance of various libraries at the time of both read and write. Read performance is checked with the existing AuthorizerBenchmark. For write performance, a new AuthorizerUpdateBenchmark has been added which evaluates the performance of the addAcl operation.


Reviewers:  Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,  Divij Vaidya <diviv@amazon.com>
2023-04-21 14:08:23 +05:30
Dimitar Dimitrov e14dd8024a
KAFKA-14821 Implement the listOffsets API with AdminApiDriver (#13432)
We are handling complex workflows ListOffsets by chaining together MetadataCall instances and ListOffsetsCall instances, there are many complex and error-prone logic. In this PR we rewrote it with the `AdminApiDriver` infra, notable changes better than old logic:
1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and `LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly without retry.
2. Removing class field `supportsMaxTimestamp` and calculating it on the fly to avoid the mutable state, this won't change any behavior of  the client.
3. Retry fulfillment stage on `RetriableException`, whereas in the past we just retry fulfillment stage on `InvalidMetadataException`, this means we will retry on `TimeoutException` and other `RetriableException`.

We also `handleUnsupportedVersionException` to `AdminApiHandler` and `AdminApiLookupStrategy`, they are used to keep consistency with old logic, and we can continue improvise them. 

Reviewers: Ziming Deng <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
2023-04-20 11:29:27 +08:00