Included in this change:
1. Remove deprecated protocol api versions from json files.
3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs).
4. Remove record down-conversion support from KafkaApis.
5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1].
6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant
configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since
down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724.
7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version.
8. Simplify producer logic since it only supports the v2 record format now.
9. Fix tests so they don't exercise protocol api versions that have been removed.
10. Add upgrade note.
Testing:
1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below).
3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered.
4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR.
5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2].
6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine.
7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4].
8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5].
9. kcat 1.6.0-1.7.0 produce works fine.
10. kcat 1.7.1 (based on librdkafka 1.8.2) works fine for consumer and produce.
11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce.
12. I will test more clients, but I don't think we need to block the PR on that.
Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported.
Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis,
but only what was required due to the schema changes). We can probably do that in master only as it does
not change behavior.
Note that I did not touch `ignorable` fields even though some of them could have been
changed. The reasoning is that this could result in incompatible changes for clients
that use new protocol versions without setting such fields _if_ we don't manually
validate their presence. I will file a JIRA ticket to look into this carefully for each
case (i.e. if we do validate their presence for the appropriate versions, we can
set them to ignorable=false in the json file).
[1] We would return this error if a fetch < v10 was used and the compression topic config was set
to zstd, but we would not do the same for the case where zstd was compressed at the producer
level (the most common case). Since there is no efficient way to do the check for the common
case, I made it consistent for both by having no checks.
[2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1```
[3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker`
Reviewers: David Arthur <mumrah@gmail.com>
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator.
* Introduce `coordinator-key` and `coordinator-value` in the schema;
* Introduce `apiKey` for those. This is done to avoid relying on the version to determine the type.
* It also allows the generator to enforce some rules: the key cannot use flexible versions, the key must have a single version `0`, there must be a key and a value for a given api key, etc.
* It generates an enum with all the coordinator record types. This is pretty handy in the code.
The patch also updates the group coordinators to use those.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Andrew Schofield <aschofield@confluent.io>
KIP-1071 specifies records that the group coordinator needs to store
into the consumer offset topic to persist the state of a Streams
group. This records are specified in json files from which the actual
classes for the records are generated.
This commit adds the records needed by the group coordinator to store
the state of a Streams group.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
KIP-848 will be release as GA in Apache Kafka 4.0. Hence we need to mark all the related public apis as stable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
We have observed the below errors in some cluster:
Uncaught exception in scheduled task 'handleTxnCompletion-902667' exception.message:Trying to complete a transactional offset commit for producerId *** and groupId *** even though the offset commit record itself hasn't been appended to the log.
When a transaction is completed, the transaction coordinator sends a WriteTxnMarkers request to all the partitions involved in the transaction to write the markers to them. When the broker receives it, it writes the markers and if markers are written to the __consumer_offsets partitions, it informs the group coordinator that it can materialize the pending transactional offsets in its main cache. The group coordinator does this asynchronously since Apache Kafka 2.0, see this patch.
The above error appends when the asynchronous operation is executed by the scheduler and the operation finds that there are pending transactional offsets that were not written yet. How come?
There is actually an issue is the steps described above. The group coordinator does not wait until the asynchronous operation completes to return to the api layer. Hence the WriteTxnMarkers response may be send back to the transaction coordinator before the async operation is actually completed. Hence it is possible that the next transactional produce to be started also before the operation is completed too. This could explain why the group coordinator has pending transactional offsets that are not written yet.
There is a similar issue when the transaction is aborted. However on this path, we don't have any checks to verify whether all the pending transactional offsets have been written or not so we don't see any errors in our logs. Due to the same race condition, it is possible to actually remove the wrong pending transactional offsets.
PS: The new group coordinator is not impacted by this bug.
Reviewers: Justine Olshan <jolshan@confluent.io>
This pull request replaces Log4j with Log4j2 across the entire project, including dependencies, configurations, and code. The notable changes are listed below:
1. Introduce Log4j2 Instead of Log4j
2. Change Configuration File Format from Properties to YAML
3. Adds warnings to notify users if they are still using Log4j properties, encouraging them to transition to Log4j2 configurations
Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The LeaveGroup API is used by the admin client to remove static members or remove all members from the group. The latter does not work because the API does not allow removing a member using the CONSUMER protocol by member id. Moreover, the response should only include the member id if the member id was included in the request. This patch fixes both issues.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>
Disallow upgrades from classic groups to consumer groups when any member's assignment has non-empty userData.
Reviewers: David Jacot <djacot@confluent.io>
Improve descriptive information in Kafka protocol documentation.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Removes the client side AddPartitionsToTxn/AddOffsetsToTxn calls so that the partition is implicitly added as part of KIP-890 part 2.
This change also requires updating the valid state transitions. The client side can not know for certain if a partition has been added server side when the request times out (partial completion). Thus for TV2, the transition to PrepareAbort is now valid for Empty, CompleteCommit, and CompleteAbort.
For readability, the V1 and V2 endTransaction methods have been separated.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ritika Reddy <rreddy@confluent.io>
This is the last patch in the series which introduces regular expressions in the new consumer group protocol. The patch ensures that the subscription type of the group takes into account the regular expressions. Please refer to the code to see how they are included.
Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds an error message for the described groups so that it is possible to get some context on the error. It also changes the behaviour for when the group ID cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.
Reviewers: David Jacot <djacot@confluent.io>
With the addition of the SubscribedTopicRegex field to the ConsumerGroupHeartbeat request, we need to update the definition of a full request. This patch does so.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This patch does a few things:
1) It cleans up resolved regular expressions when they are unsubscribed from. It covers the regular leave/fenced paths for the new protocol and it also covers the LeaveGroup API as new members could be removed via the admin API.
2) It ensures that tombstones for resolved regular expressions are generated on the conversion patch from consumer to classic group.
3) It fixes [KAFKA-18116](https://issues.apache.org/jira/browse/KAFKA-18116) because I faced the same issue while working on the LeaveGroup API. It adds an integration test for this case too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
1) Bump validVersions of ConsumerGroupDescribeRequest.json and ConsumerGroupDescribeResponse.json to "0-1".
2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member.
3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType field as 0. Otherwise, return 1.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
There are two issues in KAFKA-18060:
1) New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations
2) The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions.
Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch relaxes requiring non-empty subscribed names and regex in the full heartbeat request. Without this, a consumer using client side regexes may not be able to join the group when the regex does not match any topics yet and this is inconsistent with the old protocol. Relaxing the subscribed regex is not strictly required but it seems better to keep it consistent.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This patch introduces the asynchronous resolution of regular expressions. Let me unpack a few details about the implementations:
1) I have decided to finally update all the regular expressions within a consumer group together. My assumption is that the number of regular expressions in a group will be generally small but the number of topics in a cluster is large. Hence grouping has two benefits. Firstly, it allows to go through the list of topics once for all the regular expressions. Secondly, it reduces the number of potential rebalances because all the regular expressions are updated at the same time.
2) An update is triggered when the group is subscribed to at least one regular expressions.
3) An update is triggered when there is no ongoing update.
4) An update is triggered only of the previous one is older than 10s.
5) An update is triggered when the group has unresolved regular expressions.
6) An update is triggered when the metadata image has new topics.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
During conversion from classic to consumer group, if a member has empty assignment (e.g., the member just joined and has never synced), the conversion will fail because of the buffer underflow error when deserializing the member assignment. This patch allows empty assignment while deserializing the member assignment.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This PR introduces the unified GroupState enum for all group types from KIP-1043. This PR also removes ShareGroupState and begins the work to replace Admin.listShareGroups with Admin.listGroups. That will complete in a future PR.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This patch introduces the `CoordinatorExecutor` construct into the `CoordinatorRuntime`. It allows scheduling asynchronous tasks from within a `CoordinatorShard` while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.
The `GroupCoordinatorService` uses a default `ExecutorService` with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch does a few things:
* Refactors the `TargetAssignmentBuilder` to use inheritance to differentiate Consumer and Share groups.
* Introduces `UnionSet` to lazily aggregate the subscriptions for a given member.
* Wires the resolved regular expressions in the `GroupMetadataManager`. At the moment, they are only used when the target assignment is computed.
Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch cleans up the `Assertions` class in the `group-coordinator` module.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch does a few cleanups in GroupMetadataManagerTest:
* Uses `Map.of` where possible.
* Uses `List.of` instead of `Arrays.asList`.
* Fix inconsistent indentation in a few places.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This PR adds another dynamic config share.auto.offset.reset fir share groups.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This patch introduces the ConsumerGroupRegularExpression record (key + value) and updates the `GroupMatadataManager` and the `ConsumerGroup` to bookkeep it appropriately. Note that with this change, regular expressions are counted as subscribers in the `subscribedTopicNames` data structure. This is important because the topic metadata of the group is computed based on it.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch adds a data structure to ConsumerGroup to track the number of members subscribed to each regular expressions in the group. This will be useful to know whether a regex is new in the group or whether a regex must be removed from the group.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Adds the DefaultStatePersister and other supporting classes for managing share state.
* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
The PR adds capability to restrict the messages in Share Fetch. The max fetch records will be an additional way to limit the number of records sent from broker to client.
In Share Fetch, with min and mx bytes, there exists 3 problems:
1. The max.poll.records client config sends the max number of records defined to application but might have fetched extra becuase of higher max bytes. But the timeout for the sent records has started on the broker.
2. As the application processes records as per max.poll.records, hence those number of records are sent in every acknowledgement. This causes the cache data to be tracked per offset as the batch is broken.
3. The client has to sent the partial acknoledgment batch and cannot piggyback on fetch requests.
To handle the above scenario max fetch records has been added. Once this PR is merged and we define the right methodolgy then KIP will be updated to have max fetch records in share fetch RPC rather as broker config.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
When a dynamic member joins a group, it sends two JOIN_GROUP requests.
The first request doesn't have a member id and returns
MEMBER_ID_REQUIRED with a broker-generated member id. The second request
includes the broker-generated member id and joins the group for real.
We emit a log message at info level for the first join request, but not
the second. Log the second join request at info level too.
Reviewers: David Jacot <djacot@confluent.io>
Rework the uniform heterogeneous assignor to improve performance, while
preserving the high level ideas and structure from the existing
implementation:
* The assignor works in 3 stages: importing the previous assignment for
stickiness, assigning unassigned partitions and iteratively
reassigning partitions to improve balance.
* Unassigned partitions are assigned to the subscribers with the least
number of partitions. This maximizes balance within a single topic.
* During the iterative rebalancing phase, partitions are reassigned to
their previous owner if it improves balance (stickiness restoration).
* During the iterative rebalancing phase, partitions are reassigned to
the subscriber with the least number of partitions to improve
balance.
A non-exhaustive list of changes is:
* The assignment of unassigned partitions and iterative reassignment
stages now works through partitions topic by topic. Previously
partitions from topics with the same number of partitions per
subscriber would be interleaved. Since we iterate topic by topic, we
can reuse data about topic subscribers.
* Instead of maintaining TreeSets to find the least loaded subscribers,
we sort an ArrayList of subscribers once per topic and start filling
up subscribers, least loaded first. In testing, this approach was
found to be faster than maintaining PriorityQueues.
* Implement stickiness restoration by creating a mapping of partitions
to previous owner and checking against that mapping, instead of
tracking partition movements during iterative reassignment.
* Track member partition counts using a plain int array, to avoid
overhead from boxing and HashMap lookups. Member partition counts are
accessed very frequently and this needs to be fast. As a consequence,
we have to number members 0 to M - 1.
* Bound the iterative reassignment stage to a fixed number of
iterations. Under some uncommon subscription patterns, the iterative
reassignment stage converges slowly. In these cases, the iterative
reassignment stage terminates without producing an optimally balanced
assignment anyway (see javadoc for balanceTopics).
* Re-use Maps from the previous assignment where possible,
ie. introduce a copy-on-write mechanism while computing the new
assignment.
Reviewers: David Jacot <djacot@confluent.io>
This PR adds the 2 configs share.heartbeat.interval.ms and share.session.timeout.ms in GroupConfig. These can be dynamically set for a share group without restarting the server
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>