Adding support for the `urn:ietf:params:oauth:grant-type:jwt-bearer`
grant type (AKA `jwt-bearer`). Includes further refactoring of the
existing OAuth layer and addition of generic JWT assertion layer that
can be leveraged in the future.
This constitutes the main piece of the JWT Bearer grant type support.
Forthcoming commits/PRs will include improvements for both the
`client_credentials` and `jwt-bearer` grant types in the following
areas:
* Integration test coverage (KAFKA-19153)
* Unit test coverage (KAFKA-19308)
* Top-level documentation (KAFKA-19152)
* Improvements to and documentation for `OAuthCompatibilityTool`
(KAFKA-19307)
Reviewers: Manikumar Reddy <manikumar@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>
---------
Co-authored-by: Zachary Hamilton <77027819+zacharydhamilton@users.noreply.github.com>
Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
The `String.split` method never returns an array containing null
elements.
Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang
<s7133700@gmail.com>, Lan Ding <isDing_L@163.com>
## Problem
When an `txnProducer.abortTransaction()`operation encounters a
`TRANSACTION_ABORTABLE` error, it currently tries to transition to
`ABORTABLE_ERROR` state. This can create an infinite retry loop since:
1. The abort operation fails with `TRANSACTION_ABORTABLE`
2. We transition to `ABORTABLE_ERROR` state
3. The application recieves instance of TransactionAbortableException
and it retries the abort
4. The cycle repeats
## Solution
For `txnProducer.abortTransaction()`API, convert
`TRANSACTION_ABORTABLE` errors to fatal errors (`KafkaException`) during
abort operations to ensure clean transaction termination. This prevents
retry loops by:
1. Treating abort failures as fatal errors at application layer
2. Ensuring the transaction can be cleanly terminated
3. Providing clear error messages to the application
## Changes
- Modified `EndTxnHandler.handleResponse()` to convert
`TRANSACTION_ABORTABLE` errors to `KafkaException` during abort
operations
- Set TransactionManager state to FATAL
- Updated test `testAbortableErrorIsConvertedToFatalErrorDuringAbort` to
verify this behavior
## Testing
- Added test case verifying that abort operations convert
`TRANSACTION_ABORTABLE` errors to `KafkaException`
- Verified that Commit API with TRANSACTION_ABORTABLE error should
set TM to Abortable state
- Verified that Abort API with TRANSACTION_ABORTABLE error should
convert to Fatal error i.e. KafkaException
## Impact
At application layer, this change improves transaction reliability by
preventing infinite retry loops during abort operations.
Reviewers: Justine Olshan <jolshan@confluent.io>
### Problem
- Currently, when a transactional producer encounters retriable errors
(like `COORDINATOR_LOAD_IN_PROGRESS`) and exhausts all retries, finally
returns retriable error to Application Layer.
- Application reties can cause duplicate records. As a fix we are
transitioning all retriable errors as Abortable Error in transaction
producer path.
- Additionally added InvalidTxnStateException as part of
https://issues.apache.org/jira/browse/KAFKA-19177
### Solution
- Modified the TransactionManager to automatically transition retriable
errors to abortable errors after all retries are exhausted. This ensures
that applications can abort transaction when they encounter
`TransactionAbortableException`
- `RefreshRetriableException` like `CoordinatorNotAvailableException`
will be refreshed internally
[[code](6c26595ce3/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (L1702-L1705))]
till reties are expired, then it will be treated as retriable errors and
translated to `TransactionAbortableException`
- Similarly for InvalidTxnStateException
### Testing
Added test `testSenderShouldTransitionToAbortableAfterRetriesExhausted`
to verify in sender thread:
- Retriable errors are properly converted to abortable state after
retries
- Transaction state transitions correctly and subsequent operations fail
appropriately with TransactionAbortableException
Reviewers: Justine Olshan <jolshan@confluent.io>
Update catch to handle compression errors
Before :

After
```
Sent message: KR Message 376
[kafka-producer-network-thread | kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
KR: Failed to compress telemetry payload for compression: zstd, sending
uncompressed data
Sent message: KR Message 377
```
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
- Currently, read and write share state requests were allowed on
uninitialized share partitions (share partitions on which
initializeState has NOT been called). This should not be the case.
- This PR addresses the concern by adding error checks on read and
write. Other requests are allowed (initialize, readSummary, alter).
- Refactored `ShareCoordinatorShardTest` to reduce redundancy and added
some new tests.
- Some request/response classes have also been reformatted.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The flakiness occurs when the offsets topic does not yet exist. Hence,
the issue is mitigated by creating the offsets topic in `setup()`. This
serves as a workaround. The root cause is tracked in
[KAFKA-19357](https://issues.apache.org/jira/browse/KAFKA-19357).
I ran the test 100 times on my Mac and all of them passed.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Removed the unused FetchResponse#of that is not used in production. The
test cases that originally invoked this method have been updated to call
the other
[FetchResponse#of](6af849f864/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java (L232)),
which is currently used by ```KafkaApis```, to maintain the integrity of
the tests.
Reviewers: Jun Rao <junrao@gmail.com>, PoAn Yang <payang@apache.org>,
Chia-Ping Tsai <chia7712@gmail.com>
Adds missing documentation to the `partitionsToOffsetAndMetadata`
methods in both `ListStreamsGroupOffsetsResult` and
`ListShareGroupOffsetsResult` classes to clarify the behavior when a
group does not have a committed offset for a specific partition.
As document in ListConsumerGroupOffsetsResult: > If the group doesn’t
have a committed offset for a specific partition, the corresponding
value in the returned map will be null.
This important detail was previously missing in the JavaDoc of the
stream and share group variants.
Reviewers: Nick Guo <lansg0504@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
#15613 ensures that all `commitAsync` callbacks are triggered before
`commitSync` completes for `AsyncKafkaConsumer`. However, the related
changes to `ClassicKafkaConsumer`, #15693, were not merged. I assume
this might be because we intend to gradually move toward using AsyncConsumer
instead.
In short, this behavioral difference should be documented.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Use Java to rewrite BaseConsumerTest, SaslPlainPlaintextConsumerTest by
new test infra and move it to client-integration-tests module, the
BaseConsumerTest test is still used, thus we should not remove now.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<frankvicky@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This PR rewrites `ConsumerWithLegacyMessageFormatIntegrationTest.scala`
in Java and moves it to the `clients-integration-tests module`.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
* Return resource doesn't exist message when users try to describe a
non-existent resource in kafka-configs.sh and kafka-client-metrics.sh.
* For groups type, the command checks both existent groups and
non-existent groups but having dynamic config. If it cannot find a group
in both conditions, return resource doesn't exist message.
Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
public void completeTransaction(PreparedTxnState preparedTxnState)
The method compares the currently prepared transaction state and the
state passed in the argument.
1. Commit if the state matches
2. Abort the transaction otherwise.
If the producer is not in a prepared state (i.e., neither
prepareTransaction was called nor initTransaction(true) was called), we
return an INVALID_TXN_STATE error.
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
There is a sequence of interactions with the membership managers of
KIP-848, KIP-932, KIP-1071 that can put the membership manager into
JOINING state, but where member epoch is set to -1. This can result in
an invalid request being sent, since joining heartbeats should not have
member epoch -1. This may lead to the member failing to join. In the
case of streams, the group coordinator will return INVALID_REQUEST.
This is the sequence triggering the bug, which seems to relatively
likely, caused by two heartbeat responses being received after the next
one has been sent.
```
membershipManager.leaveGroup();
-> transitions to LEAVING
membershipManager.onHeartbeatRequestGenerated();
-> transitions to UNSUBSCRIBED
membershipManager.onHeartbeatSuccess(... with member epoch > 0);
-> unblocks the consumer
membershipManager.onSubscriptionUpdated();
membershipManager.onConsumerPoll();
-> transitions to JOINING
membershipManager.onHeartbeatSuccess(... with member epoch < 0);
-> updates the epoch to a negative value
```
Now we are in state `JOINING` with `memberEpoch=-1`, and the next
heartbeat we send will be malformed, triggering `INVALID_REQUEST`
The bug may also be triggered if the `unsubscribe` times out, but this
seems more of a corner case.
To prevent the bug, we are taking two measures: The likely path to
triggering the bug can be prevented by not unblocking an `unsubscribe`
call in the consumer when a non-leave-heartbeat epoch is received. Once
we have sent out leave group heartbeat, we will ignore all heartbeats,
except for those containing `memberEpoch < 0`.
For extra measure, we also prevent the second case (`unsubscribe` timing
out). In this case, the consumer gets unblocked before we have received
the leave group heartbeat response, and may resubscribe to the group. In
this case, we shall just ignore the heartbeat response that contains a
member epoch < 0, once it arrives, and we have already left the
`UNSUBSCRIBED` state.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield
<aschofield@confluent.io>, Shivsundar R <shr@confluent.io>
This PR attempts to removed the flakiness in
`testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck`
and `testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck`.
This flakiness could potentially be caused because we were not ensuring
that the aborted transaction record produce happened. In this PR, I have
added a blocking call to make sure the produce future completes before
we abort the transaction.
Reviewers: Andrew Schofield <aschofield@confluent.io>
While rewriting `EndToEndClusterIdTest` in Java (#19741 ), I found that
the test uses `MockInterceptor` and `MockSerializer` together. However,
`MockSerializer` was using a `byte[]` serializer, while
`MockInterceptor` expected a `String` serializer, leading to a
`ClassCastException`.
I chose to update `MockSerializer` to use `String`, as it is used less
frequently than the `MockInterceptor`. Using `String` also simplifies
the code by avoiding the need to write expressions like
`"value".getBytes`.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<frankvicky@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
I verified the behavior by rewriting the
`GroupAuthorizerIntegrationTest` in Java in this PR:
https://github.com/apache/kafka/pull/19685 The state is now correct.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
[KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets RPC.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Co-authored-by: jimmy <wangzhiwang@qq.com>
Use Java to rewrite PlaintextConsumerPollTest by new test infra and move
it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* Add new functions `listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options)` and
`listConfigResources()` to `Admin` interface.
* New functions can list all kind of config resource types.
* If input is a set with a type other than `CLIENT_METRICS` and
request version is 0, return `UnsupportedVersionException`.
* Deprecate functions
`listClientMetricsResources(ListClientMetricsResourcesOptions options)`
and `listClientMetricsResources()`.
* Deprecate classes `ListClientMetricsResourcesResult` and
`ClientMetricsResourceListing`.
* Change `ClientMetricsCommand` to use `listConfigResources`.
* Add integration tests to `PlaintextAdminIntegrationTest.java`.
* Add unit tests to `KafkaAdminClientTest.java`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
### Issue:
API Responses missing latest version in [Kafka protocol
guide](https://kafka.apache.org/protocol.html)
#### For example:
These are missing:
- ApiVersions Response (Version: 4) — Only versions 0–3 are documented,
though version 4 of the request is included.
- DescribeTopicPartitions Response — Not listed at all.
- Fetch Response (Version: 17) — Only versions 4–16 are documented,
though version 17 of the request is included.
#### After the fix:
docs/generated/protocol_messages.html
<img width="1045" alt="image"
src="https://github.com/user-attachments/assets/5ea79ced-aab5-4c47-8e09-9956047c9bf1"
/>
Reviewers: dengziming <dengziming1993@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR adds changes, so the IQ endpoint information is only sent to
streams group members when there has been a change in the assignments
requiring an update in the streams client host-partition ownership.
The existing IQ integration test passes with no modifications and
updated the `GroupMetadataManagerTest` to cover the new process.
Reviewers: Matthias Sax <mjsax@apache.org>, Lucas Brutschy
<lucasbru@apache.org>
Follow up https://github.com/apache/kafka/pull/19460/files#r2062664349
Reviewers: Ismael Juma <ismael@juma.me.uk>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
According to KIP-830, in Kafka 4 users currently relying on JmxReporter
and that are using additional reporters via metric.reporters will have
to include org.apache.kafka.common.metrics.JmxReporter in
metric.reporters.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Updates the min version used by `ListOffsetsRequest` to
`ApiKeys.LIST_OFFSETS.oldestVersion()` rather than hardcoding `1`.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Chia-Ping
Tsai <chia7712@gmail.com>
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <aschofield@confluent.io>
---------
Signed-off-by: PoAn Yang <payang@apache.org>
Adding test to specifically force the fencing path due to delayed
rebalance, and validate how the consumer recovers automatically.
Running this test and DEBUG log enabled, allows to see the details of
the fencing flow: consumer getting fenced due to rebalance exceeded,
resetting to epoch 0, rejoining on the next poll with the existing
subscription, and being accepted back in the group (so consumption
resumes)
This is aimed to help understand
[KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233)
Will add another one in separate PR to also involve commits in similar
fencing scenarios.
Reviewers: TengYao Chi <frankvicky@apache.org>
According to the recent changes in KIP-932, when the share session cache
is full and a broker receives a Share Fetch request with Initial Share
Session Epoch (0), then the error code `SHARE_SESSION_LIMIT_REACHED` is
returned after a delay of maxWaitMs. This PR implements this logic. In
order to add a delay between subsequent share fetch requests, the timer
is delayed operation purgatory is used. A new `IdleShareFetchTimeTask`
has been added which takes in a CompletableFuture<Void>. Upon the
expiration, this future is completed with null. When the future is
completes, a response is sent back to the client with the error code
`SHARE_SESSION_LIMIT_REACHED`
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
If delete.topic.enable is false on the brokers, deleteTopics will mark
the topics for deletion, but not actually delete them. The futures will
return successfully in this case.
It is not true as the server return exception now.
```java if (!config.deleteTopicEnable) { if (apiVersion < 3) { throw
new InvalidRequestException("Topic deletion is disabled.") } else {
throw new TopicDeletionDisabledException() } } ```
Reviewers: Nick Guo <lansg0504@gmail.com>, PoAn Yang
<payang@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch belongs to the client-side changes required to enable 2PC as
a part of KIP-939.
New method is added to KafkaProducer: public PreparedTxnState
prepareTransaction()
This would flush all the pending messages and transition the producer
into a mode where only .commitTransaction, .abortTransaction, or
.completeTransaction could be called (calling other methods, e.g. .send
, in that mode would result in IllegalStateException being thrown). If
the call is successful (all messages successfully got flushed to all
partitions) the transaction is prepared. If the 2PC is not enabled, we
return the INVALID_TXN_STATE error.
A new state is added to the TransactionManager called
PREPARING_TRANSACTION. There are two situations where we would move into
this state:
1) When prepareTransaction() is called during an ongoing transaction
with 2PC enabled
2) When initTransaction(true) is called after a client failure
(keepPrepared = true)
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
<jolshan@confluent.io>
Guard against possible `NullPointerExceptions` in
`ConsumerNetworkThread.cleanup()` if
`ConsumerNetworkThread.initializeResources()` previously failed.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In Java return types are covariant. This means that method override can
override the return type with a subclass.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai
<chia7712@apache.org>, Apoorv Mittal <apoorvmittal10@gmail.com>
Rename `AccessTokenRetriever` and `AccessTokenValidator` to
`JwtRetriever` and `JwtValidator`, respectively. Also converting the
factory pattern classes `AccessTokenRetrieverFactory` and
`AccessTokenValidatorFactory` into delegate/wrapper classes
`DefaultJwtRetriever` and `DefaultJwtValidator`, respectively.
These are all internal changes, no configuration, user APIs, RPCs, etc.
were changed.
Reviewers: Manikumar Reddy <manikumar@confluent.io>, Ken Huang
<s7133700@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
---------
Co-authored-by: Ken Huang <s7133700@gmail.com>
Improve docs for retries config, mainly to clarify the expected
behaviour on retries=0 Also remove unused funcs and fix typo.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Ming-Yen Chung
<mingyen066@gmail.com>, PoAn Yang <payang@apache.org>
This patch extends the OffsetFetch API to support topic ids. From
version 10 of the API, topic ids must be used.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans <lmagrans@confluent.io>
As mentioned in
https://github.com/apache/kafka/pull/19378#pullrequestreview-2775598123,
the error messages for a 4.1 share consumer could be clearer for the
different cases for when it cannot successfully join a share group.
This PR uses different error messages for the different cases such as
out-of-date cluster or share groups just not enabled.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
* In ConsoleShareConsumerTest, add `@SuppressWarnings("unchecked")`
annotation in method shouldUpgradeDeliveryCount
* In ListConsumerGroupOffsetsHandlerTest, add generic parameters to
HashSet constructors
* In TopicsImageTest, add explicit generic type to Collections.EMPTY_MAP
to fix raw type usage
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>