Some client APIs may return `null` values in the map, but this behavior
isn’t documented in the JavaDoc. We should update the JavaDoc to include
these edge cases.
Reviewers: Kirk True <kirk@kirktrue.pro>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
In the return results of the methods beginningOffsets and endOffset, if
timeout == 0, then an empty Map should be returned uniformly instead of
in the form of <TopicPartition, null>
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet
Magrans <lmagrans@confluent.io>
There is some redundant code that could be removed in `CloseOptions`.
This patch also adds unit tests for CloseOptions.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
The setter of `maxPollRecords` wrongly checks the field instead of the argument.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
* Currently in the share group heartbeat flow, if we see a TP subscribed
for the first time, we move that TP to initializing state in GC and let
the GC send a persister request to share group initialize the
aforementioned TP.
* However, if the coordinator runtime request for share group heartbeat
times out (maybe due to restarting/bad broker), the future completes
exceptionally resulting in persiter request to not be sent.
* Now, we are in a bad state since the TP is in initializing state in GC
but not persister initialized. Future heartbeats for the same share
partitions will also not help since we do not allow retrying persister
request for initializing TPs.
* This PR remedies the situation by allowing the same.
* A temporary fix to increase offset commit timeouts in system tests was
added to fix the issue. In this PR, we revert that change as well.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Propose adding a new filter TransactionalIdPattern. This transaction ID pattern filter works as AND with the other transaction filters. Also, it is empowered with Re2j.
KIP: https://cwiki.apache.org/confluence/x/4gm9F
Reviewers: Justine Olshan <jolshan@confluent.io>, Ken Huang
<s7133700@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
For records which are automatically released as a result of closing a
share session normally, the delivery count should not be incremented.
These records were fetched but they were not actually delivered to the
client since the disposition of the delivery records is carried in the
ShareAcknowledge which closes the share session. Any remaining records
were not delivered, only fetched.
This PR releases the delivery count for records when closing a share
session normally.
Co-authored-by: d00791190 <dinglan6@huawei.com>
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Up till now, the share sessions in the broker were only attempted to
evict when the share session cache was full and a new session was trying
to get registered. With the changes in this PR, whenever a share
consumer gets disconnected from the broker, the corresponding share
session would be evicted from the cache.
Note - `connectAndReceiveWithoutClosingSocket` has been introduced in
`GroupCoordinatorBaseRequestTest`. This method creates a socket
connection, sends the request, receives a response but does not close
the connection. Instead, these sockets are stored in a ListBuffer
`openSockets`, which are closed in tearDown method after each test is
run. Also, all the `connectAndReceive` calls in
`ShareFetchAcknowledgeRequestTest` have been replaced by
`connectAndReceiveWithoutClosingSocket`, because these tests depends
upon the persistence of the share sessions on the broker once
registered. But, with the new code introduced, as soon as the socket
connection is closed, a connection drop is assumed by the broker,
leading to session eviction.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This PR uses the v1 of the ShareVersion feature to enable share groups
for KIP-932.
Previously, there were two potential configs which could be used -
`group.share.enable=true` and including "share" in
`group.coordinator.rebalance.protocols`. After this PR, the first of
these is retained, but the second is not. Instead, the preferred switch
is the ShareVersion feature.
The `group.share.enable` config is temporarily retained for testing and
situations in which it is inconvenient to set the feature, but it should
really not be necessary, especially when we get to AK 4.2. The aim is to
remove this internal config at that point.
No tests should be setting `group.share.enable` any more, because they
can use the feature (which is enabled in test environments by default
because that's how features work). For tests which need to disable share
groups, they now set the share feature to v0. The majority of the code
changes were related to correct initialisation of the metadata cache in
tests now that a feature is used.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
There will be an update to the PluginMetrics#metricName method: the type
of the tags parameter will be changed
from Map to LinkedHashMap.
This change is necessary because the order of metric tags is important
1. If the tag order is inconsistent, identical metrics may be treated as
distinct ones by the metrics backend
2. KAFKA-18390 is updating metric naming to use LinkedHashMap. For
consistency, we should follow the same approach here.
Reviewers: TengYao Chi <frankvicky@apache.org>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, lllilllilllilili
This PR is a migration of the initial IQ support for KIP-1071 from the
feature branch to trunk. It includes a parameterized integration test
that expects the same results whether using either the classic or new
streams group protocol.
Note that this PR will deliver IQ information in each heartbeat
response. A follow-up PR will change that to be only sending IQ
information when assignments change.
Reviewers Lucas Brutschy <lucasbru@apache.org>
Reviewers: TengYao Chi <frankvicky@apache.org>, PoAn Yang <payang@apache.org>, Lianet Magrans <lmagrans@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
- Add support topicId in `ProduceRequest`/`ProduceResponse`. Topic name
and Topic Id will become `ignorable` following the footstep of
`FetchRequest`/`FetchResponse`
- ReplicaManager still look for `HostedPartition` using `TopicPartition`
and doesn't check topic id. This is an **[OPEN QUESTION]** if we should
address this in this pr or wait for
[KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) as this
will update `ReplicaManager::getPartition` to use `TopicIdParittion`
once we update the cache. Other option is that we compare provided
`topicId` with `Partition` topic id and return `UNKNOW_TOPIC_ID` or
`UNKNOW_TOPIC_PARTITION` if we can't find partition with matched topic
id.
Reviewers: Jun Rao <jun@confluent.io>, Justine Olshan
<jolshan@confluent.io>
This is part of the client side changes required to enable 2PC for
KIP-939
New KafkaProducer.PreparedTxnState class is going to be defined as
following: ``` static public class PreparedTxnState { public String
toString(); public PreparedTxnState(String serializedState); public
PreparedTxnState(); } ``` The objects of this class can serialize to
/ deserialize from a string value and can be written to / read from a
database. The implementation is going to store producerId and epoch in
the format **producerId:epoch**
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
<jolshan@confluent.io>
The tests related of OffsetFetch request/response in MessageTest are
incomprehensible. This patch rewrites them in a simpler way.
Reviewers: TengYao Chi <frankvicky@apache.org>
While working on https://github.com/apache/kafka/pull/19515, I came to
the conclusion that the OffsetFetchRequest is quite messy and overall
too complicated. This patch rationalize the constructors.
OffsetFetchRequest has a single constructor accepting the
OffsetFetchRequestData. This will also simplify adding the topic ids.
All the changes are mechanical, replacing data structures by others.
Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Lianet Magran <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This is a follow up PR for implementation of DeleteShareGroupOffsets
RPC. This PR adds the ShareGroupStatePartitionMetadata record to
__consumer__offsets topic to make sure the topic is removed from the
initializedTopics list. This PR also removes partitions from the request
and response schemas for DeleteShareGroupState RPC
Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Use Java to rewrite `PlaintextConsumerFetchTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
If the streams rebalance protocol is enabled in
StreamsUncaughtExceptionHandlerIntegrationTest, the streams application
does not shut down correctly upon error.
There are two causes for this. First, sometimes, the SHUTDOWN_APPLICATION
code only sent with the leave heartbeat, but that is not handled broker
side. Second, the SHUTDOWN_APPLICATION code wasn't properly handled
client-side at all.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck
<bill@confluent.io>, PoAn Yang <payang@apache.org>
Replace names like a, b, c, ... with meaningful names in
AsyncKafkaConsumerTest.
Follow-up:
https://github.com/apache/kafka/pull/19457#discussion_r2056254087
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Bill Bejeck <bbejeck@apache.org>, Ken Huang <s7133700@gmail.com>
This patch does a few code changes:
* It cleans up the GroupCoordinatorService;
* It moves the helper methods to validate request to Utils;
* It moves the helper methods to create the assignment for the
ConsumerGroupHeartbeatResponse and the ShareGroupHeartbeatResponse from
the GroupMetadataManager to the respective classes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jeff Kim <jeff.kim@confluent.io>
This is part of the client side changes required to enable 2PC for
KIP-939
**Producer Config:**
transaction.two.phase.commit.enable The default would be ‘false’. If
set to ‘true’, the broker is informed that the client is participating
in two phase commit protocol and transactions that this client starts
never expire.
**Overloaded InitProducerId method**
If the value is 'true' then the corresponding field is set in the
InitProducerIdRequest
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
This PR marks the records as non-nullable for ShareFetch.
This PR is as per the changes for Fetch:
https://github.com/apache/kafka/pull/18726 and some work for ShareFetch
was done here: https://github.com/apache/kafka/pull/19167. I tested with
marking `records` as non-nullable in ShareFetch, which required
additional handling. The same has been fixed in current PR.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>, PoAn Yang
<payang@apache.org>
The generated response data classes take Readable as input to parse the
Response. However, the associated response objects take ByteBuffer as
input and thus convert them to Readable using `new ByteBufferAccessor`
call.
This PR changes the parse method of all the response classes to take the
Readable interface instead so that no such conversion is needed.
To support parsing the ApiVersionsResponse twice for different version
this change adds the "slice" method to the Readable interface.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Truc Nguyen
<[trnguyen@confluent.io](mailto:trnguyen@confluent.io)>, Aadithya
Chandra <[aadithya.c@gmail.com](mailto:aadithya.c@gmail.com)>
Change the log messages which used to warn that KIP-932 was an Early
Access feature to say that it is now a Preview feature. This will make
the broker logs far less noisy when share groups are enabled.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
- Construct `AsyncKafkaConsumer` constructor and verify that the
`RequestManagers.supplier()` contains Streams-specific data structures.
- Verify that `RequestManagers` constructs the Streams request managers
correctly
- Test `StreamsGroupHeartbeatManager#resetPollTimer()`
- Test `StreamsOnTasksRevokedCallbackCompletedEvent`,
`StreamsOnTasksAssignedCallbackCompletedEvent`, and
`StreamsOnAllTasksLostCallbackCompletedEvent` in
`ApplicationEventProcessor`
- Test `DefaultStreamsRebalanceListener`
- Test `StreamThread`.
- Test `handleStreamsRebalanceData`.
- Test `StreamsRebalanceData`.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bill Bejeck <bill@confluent.io>
Signed-off-by: PoAn Yang <payang@apache.org>
Introduces a concrete subclass of `KafkaThread` named `SenderThread`.
The poisoning of the TransactionManager on invalid state changes is
determined by looking at the type of the current thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Improves a variable name and handling of an Optional.
Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
This patch extends the OffsetCommit API to support topic ids. From
version 10 of the API, topic ids must be used. Originally, we wanted to
support both using topic ids and topic names from version 10 but it
turns out that it makes everything more complicated. Hence we propose to
only support topic ids from version 10. Clients which only support using
topic names can either lookup the topic ids using the Metadata API or
stay on using an earlier version.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, PoAn Yang <payang@apache.org>
This PR removes the unstable API flag for the KIP-932 RPCs.
The 4 RPCs which were exposed for the early access release in AK 4.0 are
stabilised at v1. This is because the RPCs have evolved over time and AK
4.0 clients are not compatible with AK 4.1 brokers. By stabilising at
v1, the API version checks prevent incompatible communication and
server-side exceptions when trying to parse the requests from the older
clients.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Two sets of tests are added:
1. KafkaProducerTest
- when send success, both record.headers() and onAcknowledgement headers
are read only
- when send failure, record.headers() is writable as before and
onAcknowledgement headers is read only
2. ProducerInterceptorsTest
- make both old and new onAcknowledgement method are called successfully
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Omnia Ibrahim
<o.g.h.ibrahim@gmail.com>, Matthias J. Sax <matthias@confluent.io>,
Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This patch addresses issue #19516 and corrects a typo in
`ApiKeyVersionsProvider`: when `toVersion` exceeds `latestVersion`, the
`IllegalArgumentException` message was erroneously formatted with
`fromVersion`. The format argument has been updated to use `toVersion`
so that the error message reports the correct value.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
This patch extends the `@ApiKeyVersionsSource` annotation to allow
specifying the `fromVersion` and the `toVersion`. This is pretty handy
when we only want to test a subset of the versions.
Reviewers: Kuan-Po Tseng <brandboat@gmail.com>, TengYao Chi
<kitingiao@gmail.com>
This PR is a minor follow-up to [PR
#19320](https://github.com/apache/kafka/pull/19320), which cleaned up
0.10.x legacy information from the clients module.
It addresses remaining reviewer suggestions that were not included in
the original PR:
- `ClusterResourceListener`: Removed "Note the minimum supported broker
version is 2.1." per review suggestion to avoid repeating
version-specific details across multiple classes.
- `TopicConfig`: Simplified `MAX_MESSAGE_BYTES_DOC` by removing obsolete
notes about behavior in versions prior to 0.10.2.
These changes help reduce outdated version information in client
documentation and improve clarity.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
The final part of KIP-1043 is to deprecate Admin.listConsumerGroups() in
favour of Admin.listGroups() which works for all group types.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Topology description sent to broker in KIP-1071 contains
non-deterministically ordered topic configs. Since the topology is
compared to the groups topology upon joining we may run into
`INVALID_REQUEST: Topology updates are not supported yet` failures if
the topology sent by the application does not match the group topology
due to different topic config order.
This PR ensures that topic configs are ordered, to avoid an
`INVALID_REQUEST` error.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Enhanced docs of `flush.ms` to remind users the flush is triggered by
`log.flush.scheduler.interval.ms`.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
the following tasks should be addressed in this ticket rewrite it by
1. new test infra
2. use java
3. move it to clients-integration-test
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Add the new `SHARE_SESSION_LIMIT_REACHED` error code which is used when
an attempt is made to open a new share session when the share session
limit of the broker has already been reached. Support in the client and
broker will follow in subsequent PRs.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This patch updates the `GroupCoordinator` interface to use
`AuthorizableRequestContext` instead of using `RequestContext`. It makes
the interface more generic. The only downside is that the request
version in `AuthorizableRequestContext` is an `int` instead of a `short`
so we had to adapt it in a few places. We opted for using `int` directly
wherever possible.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Use Java to rewrite `PlaintextConsumerCallbackTest` by new test infra
and move it to client-integration-tests module.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This also adds metrics to StandardAuthorizer
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
<tjwu1217@gmail.com>
This patch adds ACL support for 2PC as a part of KIP-939
A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT
((byte) 15 . When InitProducerId comes with enable2Pc=true, it would
have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the
transactional id resource.
The kafka-acls.sh tool is going to support a new --operation
TwoPhaseCommit.
Reviewers: Artem Livshits <alivshits@confluent.io>, PoAn Yang
<poan.yang@suse.com>, Justine Olshan <jolshan@confluent.io>
Choose the acknowledgement mode based on the config
(`share.acknowledgement.mode`) and not on the basis of how the user
designs the application.
- The default value of the config is `IMPLICIT`, so if any
empty/null/invalid value is configured, then the mode defaults to
`IMPLICIT`.
- Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no
longer required.
- Added code to ensure if the application has any unacknowledged records
in a batch in "`explicit`" mode, then it will throw an
`IllegalStateException`. The expectation is if the mode is "explicit",
all the records received in that `poll()` would be acknowledged before
the next call to `poll()`.
- Modified the `ConsoleShareConsumer` to configure the mode to
"explicit" as it was using the explicit mode of acknowledging records.
Reviewers: Andrew Schofield <aschofield@confluent.io>
ShareConsumers` may wait on an unneeded `FindCoordinator` during
`close()`(i.e after the acknowledgements are sent).
https://github.com/apache/kafka/pull/18590 added the
`StopFindCoordinatorOnClose` event and was used by the regular
consumers. We are using the same event in `ShareConsumers` as well to
prevent sending this event when coordinator is no longer needed.
Reviewers: Andrew Schofield <aschofield@confluent.io>
include three test case
- ProducerCompressionTest
- ProducerFailureHandlingTest
- ProducerIdExpirationTest
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
As of 3.9, Kafka allows disabling remote storage on a topic after it was
enabled. It allows subsequent enabling and disabling too.
However the documentation says otherwise and needs to be corrected.
Doc:
https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable
Reviewers: Luke Chen <showuon@gmail.com>, PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>
Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test
infra and move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In the first version of the integration of the stream thread with the
new Streams rebalance protocol, the consumer used a dedicated event
queue for Streams/specific background events to request the stream
thread to call the rebalance callbacks. That led to an issue where the
consumer times out when unsubscribing.
This commit gets rid of the dedicated queue and incorporates the
Streams-specific background events into event queue used by the
consumer.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
JIRA: KAFKA-18935 This patch ensures the broker will not return null
records in FetchResponse. For more details, please refer to the
ticket.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai
<chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This PR adds the share group dynamic config `share.isolation.level`.
Until now, share groups only supported `READ_UNCOMMITTED` isolation
level type. With this PR, we aim to support `READ_COMMITTED` isolation
type to share groups.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
Call the StateRestoreListener#onBatchRestored with numRestored and not
the totalRestored when reprocessing state
See: https://issues.apache.org/jira/browse/KAFKA-18962
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias
Sax <mjsax@apache.org>
The consumer adaptations for the new Streams rebalance protocol need to
be integrated into the Streams code. This commit does the following:
- creates an async Kafka consumer
- with a Streams heartbeat request manager
- with a Streams membership manager
- integrates consumer code with the Streams membership manager and the
Streams heartbeat request manager
- processes the events from the consumer network thread (a.k.a.
background thread)
that request the invocation of the "on tasks revoked", "on tasks
assigned", and "on all tasks lost"
callbacks
- executes the callbacks
- sends to the consumer network thread the events signalling the
execution of the callbacks
- adapts SmokeTestDriverIntegrationTest to use the new Streams rebalance
protocol
This commit misses some unit test coverage, but it also unblocks other
work on trunk regarding the new Streams rebalance protocol. The missing
unit tests will be added soon.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This PR contains the implementation of KafkaAdminClient and
GroupCoordinator for DeleteShareGroupOffsets RPC.
- Added `deleteShareGroupOffsets` to `KafkaAdminClient`
- Added implementation for `handleDeleteShareGroupOffsetsRequest` in
`KafkaApis.scala`
- Added `deleteShareGroupOffsets` to `GroupCoordinator` as well.
internally this makes use of `persister.deleteState` to persist the
changes in share coordinator
Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
Java provides a specialised Map where Enums are the keys, which can
provide some performance improvements.
https://docs.oracle.com/javase/8/docs/api/java/util/EnumMap.html
I have updated the Java code where possible to use an EnumMap rather
than a HashMap and run the unit tests under the requests directory.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The `lastOffset` is not used actually, so it can be removed.
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Currently for ShareConsumers, if we receive an
`UNKNOWN_TOPIC_OR_PARTITION` error code in the
`ShareAcknowledgeResponse`, then we retry sending the acknowledgements
until the timer expires.
We ideally do not want this when a topic/partition is deleted, hence
like the
`CommitRequestManager`(https://github.com/apache/kafka/pull/15581), we
will treat this error as fatal and not retry the acknowledgements.
PR also suppresses `InvalidTopicException` during `unsubscribe()` which
was also added in the
`AsyncKafkaConsumer`(https://github.com/apache/kafka/pull/16043). It was
later removed in the regular consumer
as they notified the background operations of metadata errors instead of
propagating them via `ErrorEvent`. `ShareConsumerImpl` however does
not require that change and it still propagates the metadata error back
to the application. So we would need to suppress this exception during
unsubscribe().
Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
## Summary
This PR updates the `RecordVersion` javadoc for clarity. It removes
outdated references to `message.format.version` mentioned in the [Kafka
4.0 upgrade
documentation](48f06981ee/40/upgrade.html (L135))
and aligns with feedback from a previous discussion in [#19325
](https://github.com/apache/kafka/pull/19325).
## Changes
- Cleaned up javadoc in `RecordVersion`
- Removed outdated or deprecated references
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The Callback interface is a perfect example of a place that can use the
functionalinterface in Java. Strictly for Java, this isn't "required"
since Java will automatically coerce, but for Clojure (and other JVM
languages I belive) to interop with Java lambdas it needs the
FunctionalInterface annotation.
Since FunctionalInterface doesn't add any overhead and provides
compiler-enforced documentation, I don't see any reason *not* to have
this. This has already been added into Kafka Streams here:
https://github.com/apache/kafka/pull/19234#pullrequestreview-2740742487
I am happy to add it to any other spots in that might be useful too.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This is from [#16532's comment](https://github.com/apache/kafka/pull/16532/files#r2028985028):
The forEach loop in the assertion will never execute because
`nonResponseData` is empty.
This happens because the above assertion `emptyMap()` has a size of 0,
so there are no elements to iterate over.
Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Use Java to rewrite `TransactionsExpirationTest` by new test infra and
move it to client-integration-tests module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Jira: https://issues.apache.org/jira/browse/KAFKA-19074
Similar fix https://github.com/apache/kafka/pull/165322b8aff58b5
make it accept input to return "partial" data.
The content of output is based on the input but we cache the output ...
It will return same output even though we pass different input. That is
a potential bug.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This PR approaches completion of Admin.listShareGroupOffsets() and
kafka-share-groups.sh --describe --offsets.
Prior to this patch, kafka-share-groups.sh was only able to describe the
offsets for partitions which were assigned to active members. Now, the
Admin.listShareGroupOffsets() uses the persister's knowledge of the
share-partitions which have initialised state. Then, it uses this list
to obtain a complete set of offset information.
The PR also implements the topic-based authorisation checking. If
Admin.listShareGroupOffsets() is called with a list of topic-partitions
specified, the authz checking is performed on the supplied list,
returning errors for any topics to which the client is not authorised.
If Admin.listShareGroupOffsets() is called without a list of
topic-partitions specified, the list of topics is discovered from the
persister as described above, and then the response is filtered down to
only show the topics to which the client is authorised. This is
consistent with other similar RPCs in the Kafka protocol, such as
OffsetFetch.
Reviewers: David Arthur <mumrah@gmail.com>, Sushant Mahajan <smahajan@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Removes outdated references to Kafka 0.10.x in the clients module
documentation. Since the baseline version is now 2.1, any mentions of
versions earlier than this are unnecessary and have been removed or
updated accordingly.
Changes:
- Updated `ClusterResource`, `ClusterResourceListener`, and
`DescribeClusterResult` Javadoc to reflect the minimum supported broker
version as 2.1.
- Updated `TopicConfig` documentation: Removed references to consumers
older than 0.10.2.
- Removed references to 0.10.x and adjusted explanations to remain
relevant for newer versions.
Testing & Impact:
- This PR only modifies Javadoc comments—no functional code changes.
- No impact on existing functionality.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
Adds the FunctionalInterface annotation to relevant Kafka Streams
classes. While this is not strictly required for Java, it's still best
practice and also useful for better integration with other JVM
languages, for example Clojure, to allow using these interfaces as
lambdas.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This patch is part of KIP-939 [Support Participation in
2PC](https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC)
The kafka-transactions.sh tool will support a new command
--forceTerminateTransaction It has one required argument
--transactionalId that would take the transactional id for the
transaction to be terminated.
The command uses the existing Admin#fenceProducers method to forcefully
abort the transaction associated with the specified transactional ID.
Under the hood, it sends an InitProducerId request to the transaction
coordinator with the given transactional ID and keepPreparedTxn = false
by default. This is aligned with the functionality outlined in the KIP.
We will be creating a new public method in the Admin Client **public
TerminateTransactionResult forceTerminateTransaction(String
transactionalId)**, and re-use the existing fence producer method.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
KafkaShareConsumerTest is proving very flaky. The behaviour of
MockClient does not appear to match the expectations of the test. This
PR disables the flaky tests to reduce build noise. When a proper
solution has been worked out, the tests can be re-enabled.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Currently if we received just a control record in the
`ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would
not be updated as the record is ignored. But in the process, we lose the
acknowledgment for this control record which is a GAP.
PR fixes this by adding an additional map for control record
acknowledgements in `ShareFetchEvent`.
This updates both the ShareConsumerImpl and ShareConsumeRequestManager
to accommodate the additional map.
Added a unit test in `ShareConsumerImplTest` and
`ShareConsumeRequestManagerTest` to verify the changes.
Reviewers: Andrew Schofield <aschofield@confluent.io>
PR add `MaxRecords` to share fetch request and also adds
`AcquisitionLockTimeout` to share fetch response. PR also removes
internal broker config of `max.fetch.records`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch moves `ConsumerTopicCreationTest` to the
`client-integration-tests` and rewrite it as Java.
The patch also streamlines the test flow.
In the Scala version, there is a producer that produces messages, but
this is not the main purpose of the `ConsumerTopicCreationTest`.
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Enhance the documentation for Admin#describeCluster and
Admin#describeConfigs to clarify their behavior when using
bootstrap.controllers and bootstrap.servers.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.
Reviewers: Andrew Schofield <aschofield@confluent.io>
The generated request data type's constructors take Readable as an input. However, the parse method in the
AbstractRequest takes a ByteBuffer as input. So to create the corresponding request data objects, each individual
concrete Request classes wraps the ByteBuffer into a ByteBufferAccessor.
This is boilerplate code present in all the concrete request classes. This changes AbstractRequest's parse method so that subclasses can simply pass the `Readable` they get directly to request data classes.
The same change is made to the serialize method to maintain symmetry.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio
<jsancio@apache.org>, Artem Livshits <alivshits@confluent.io>,
Truc Nguyen <trnguyen@confluent.io>
Previously, the `ShareConsumer.commitAsync()` method retried sending
`ShareAcknowledge` requests indefinitely. Now it will instead use the
defaultApiTimeout config to expire the request so that it does not retry forever.
PR also fixes a bug in processing `commitSync() `requests, where we
need an additional check if the node is free.
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Reviewers: Andrew Schofield <aschofield@confluent.io>
Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.
The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.
Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit adds error handling to the Streams heartbeat request
manager.
Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.
Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In past, we have `AbstractConfig#preProcessParsedConfig` but did not use
its return value
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The PR changed three things.
* Migrated `ProducerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
Default `ProducerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");
The test case for the producer with enabled rebootstrap
<img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM"
src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60"
/>
The test case for the producer with disabled rebootstrap
<img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM"
src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728"
/>
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
`clients-integration-tests` modules doesn't have the `log4j2.yaml` to
setting log, thus we should add.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
ReplicaSelector implementations can implement Monitorable to register their own metrics.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ken Huang <s7133700@gmail.com>
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The `lastOffset` includes the entire batch header, so we should check `baseOffset` instead.
To optimize this, we need to update the search logic. The previous
approach simply checked whether each batch's `lastOffset()` was greater
than or equal to the target offset. Once it found the first batch that
met this condition, it returned that batch immediately.
Now that we are using `baseOffset()`, we need to handle a special case:
if the `targetOffset` falls between the `lastOffset` of the previous
batch and the `baseOffset` of the matching batch, we should select the
matching batch. The updated logic is structured as follows:
1. First, if baseOffset exactly equals targetOffset, return immediately.
2. If we find the first batch with baseOffset greater than targetOffset
- Check if the previous batch contains the target
- If there's no previous batch, return the current batch or the previous
batch doesn't contain the target, return the current batch
5. After iterating through all batches, check if the last batch contains
the target offset.
This code path is not thread-safe, so we need to prevent `EOFException`.
To avoid this exception, I am still using an early return. In this
scenario, `lastOffset` is still used within the loop, but it should be
executed at most once within the loop.
Therefore, in the new implementation, `lastOffset` will be executed at
most once. In most cases, this results in an optimization.
Test: Verifying Memory Usage Improvement
To evaluate whether this optimization helps, I followed the steps below
to monitor memory usage:
1. Start a Standalone Kafka Server
```sh
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.properties
```
2. Use Performance Console Tools to Produce and Consume Records
**Produce Records:**
```sh
./kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000000 \
--record-size 100 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
```
**Consume Records:**
```sh
./bin/kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000000 \
--bootstrap-server localhost:9092
```
It can be observed that memory usage has significantly decreased.
trunk:

this PR:

Reviewers: Kirk True <kirk@kirktrue.pro>, TengYao Chi
<kitingiao@gmail.com>, David Arthur <mumrah@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch filters out the topic describe unauthorized topics from the
StreamsGroupHeartbeat and StreamsGroupDescribe response.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This patch adds logic to enable and handle two phase commit (2PC)
transactions following KIP-939.
The changes made are as follows:
1) Add a new broker config called
**transaction.two.phase.commit.enable** which is set to false by default
2) Add new flags **enableTwoPCFlag** and **keepPreparedTxn** to
handleInitProducerId
3) Return an error if keepPreparedTxn is set to true (for now)
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan
<jolshan@confluent.io>
Move share consumer to clients-integration-tests module and use `@BeforeEach` to setup
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Given that the `core` module will be separated into other small modules,
this test will not be added to the core module.
Instead, I added it to the `clients-integration-tests` module since it
focuses on the admin client test. The patch should include following test cases.
1. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, but `describeConfigs`
does not.
2. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, and `describeConfigs`
does if admin is using controller.bootstrap
3. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, but `describeConfigs` does.
4. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, and `describeConfigs` does not
also if admin is using controller.bootstrap
for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of "controller.properties" BTW, those test cases should leverage new test infra, since new test infra allow us to define configs to broker/controller individually.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The Streams heartbeat request has some fields that are always sent.
Those are:
- group ID
- member ID
- member epoch
- group instance ID (if static membership is used)
Then it has fields that are only sent when joining:
- topology and topology epoch
- rebalance timeout
- process ID
- endpoint
- client tags
Finally, the assignment is only sent if it changed compared to the last
sent request.
Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
Currently when using serializers like the Cloud Event Serializer, we
need to do a work around so it doesn't throw an error. Using the method
taking the headers would prevent this. Since the default implementation
just calls the method without the headers, it's expected to be fully
backwards compatible.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>
Mark the following tests as flaky:
* StickyAssignorTest > testLargeAssignmentAndGroupWithUniformSubscription
* DeleteSegmentsByRetentionTimeTest
* QuorumControllerTest > testUncleanShutdownBrokerElrEnabled
Reviewers: Andrew Schofield <aschofield@confluent.io>
This PR aims to remove the usage of partition max bytes from share fetch
requests. Partition Max Bytes is being defined by
`PartitionMaxBytesStrategy` which was added to the broker as part of PR
https://github.com/apache/kafka/pull/17870
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit adds the conditions to decide when a Streams group heartbeat
should be sent.
A heartbeat should be sent when:
- the group coordinator is available
- the member is part of the Streams group or wants to join it
- the heartbeat interval expired or the member is leaving the group or
acknowledging the assginment
This commit does not implement:
- not sending fields that did not change
- handling errors
Reviewers: Zheguang Zhao <zheguang.zhao@alumni.brown.edu>, Lucas
Brutschy <lbrutschy@confluent.io>
Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.
A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.
Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
- Adding a space, article and punctuation to the Producer config doc
strings for consistency and readability.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Justine Olshan <jolshan@confluent.io>
Adds `describeStreamsGroup` to Admin API.
This exposes the result of the `DESCRIBE_STREAMS_GROUP` RPC in the Admin
API.
Reviewers: Bill Bejeck <bill@confluent.io>
This patch is a first step towards resolving KAFKA-18046. Apache Kafka
4.0 ships with log4j2 so the issue raised in the ticket causing high CPU
usage on the fetch path due to LoggerFactory.getLogger() being called on
the handling of all fetch responses is not good. Hence, I propose to fix
that one by caching the Logger used by the `CompletedFetch` class.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Logging on a per-batch bases is very chatty, and should only be done at
TRACE level to avoid spamming DEBUG logs.
Reviewers: Justine Olshan <jolshan@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
User testing of the `KafkaShareConsumer` interface has revealed some
areas which confuse people. One of these is that way that it decides
whether you want to use implicit or explicit acknowledgement of records
by observing which calls the application issues. We are taking the
opportunity to refine the interface before it is finalised.
This PR introduces an experimental configuration called
`internal.share.acknowledgement.mode` which can be used to make the
application declare which kind of acknowledgement it wishes to use. We
plan to try out the configuration, assess whether it has helped, and
then create a proper consumer configuration that makes this area better.
That would require a lot of change in the tests, which explains why this
initial PR only has a small number of tests.
Reviewers: David Arthur <mumrah@gmail.com>
Implement Admin API extensions beyond list/describe group (delete group,
offset-related APIs).
* adds methods for describing and manipulating offsets, as described in
KIP-1071
* adds corresponding unit tests
These are doing the exact same thing as the corresponding consumer group
counter-parts.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In this PR, we perform this refactor as the class is not needed since
there is no need to refer to child classes by common ref and the
duplicated code is minimal.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Implement Admin API extensions beyond list/describe group (delete group, offset-related APIs).
* adds methods for describing and manipulating offsets, as described in KIP-1071
* adds corresponding unit tests
These are doing the exact same thing as the corresponding consumer group counter-parts.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In KRaft mode, custom KafkaPrincipalBuilder instances must implement KafkaPrincipalSerde. This PR updates all related documentation to highlight this requirement.
Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1. Convert end txn marker schema to use auto-generated protocol`EndTxnMarker`
2. substitute `CURRENT_END_TXN_MARKER_VALUE_SIZE` with an`endTnxMarkerValueSize` method since the size is accumulated from `EndTxnMarker`.
3. add buffer to `EndTransactionMarker` to avoid twice compute from `serializeValue` and `endTnxMarkerValueSize`.
4. flexibleVersions is set to none.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1、Client support for TopicAuthException in DescribeShareGroup and HB
path
2、ShareConsumerImpl#sendAcknowledgementsAndLeaveGroup swallow
TopicAuthorizationException and GroupAuthorizationException
Reviewers: ShivsundarR <shr@confluent.io>, Andrew Schofield <aschofield@confluent.io>
The AbstractHeartbeatRequestManager and the
StreamsGroupHeartbeatRequestManager, both use the
HeartbeatRequestState to track the state of the heartbeat requests. Both
heartbeat request managers have an implementation of
HeartbeatRequestState as inner class.
To deduplicate code this commit extracts the HeartbeatRequestState so
that the same code can be used by both heartbeat request manager.
Reviewers: Kirk True <ktrue@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
There are 3 issues (at least) about the multithreaded issue on ConsumerRecords. Hence, it would be better to document it completely.
Reviewers: Kirk True <ktrue@confluent.io>, TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Xuan-Zhang Gong <gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The purpose of this PR is to remove the `@InterfaceStability.Evolving` from classes that were created over a year ago.
Reviewers: Jun Rao <junrao@gmail.com>
- Added a unit test to validate the exception hierarchy for all KIP-1050
transaction related exceptions.
- RetriableException is correctly extended by all child classes
- Included test for RetriableException exception with verification that
all exceptions extending `RetriableException` do not inadvertently
extend `RefreshRetriableException, preserving the intended behavior.
Reviewers: Kirk True <ktrue@confluent.io>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Justine Olshan <jolshan@confluent.io>
KIP-966 adds strict min ISR rule, so this PR improves the docs of min.insync.replicas to include that change.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
In ConsumerGroupHeartbeat,
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.
In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)
Reviewers: David Jacot <djacot@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
TengYao Chi <kitingiao@gmail.com>
For the KRaft implementation there is a race between the network thread,
which read bytes in the log segments, and the KRaft driver thread, which
truncates the log and appends records to the log. This race can cause
the network thread to send corrupted records or inconsistent records.
The corrupted records case is handle by catching and logging the
CorruptRecordException. The inconsistent records case is handle by only
appending record batches who's partition leader epoch is less than or
equal to the fetching replica's epoch and the epoch didn't change
between the request and response.
For the ISR implementation there is also a race between the network
thread and the replica fetcher thread, which truncates the log and
appends records to the log. This race can cause the network thread send
corrupted records or inconsistent records. The replica fetcher thread
already handles the corrupted record case. The inconsistent records case
is handle by only appending record batches who's partition leader epoch
is less than or equal to the leader epoch in the FETCH request.
Reviewers: Jun Rao <junrao@apache.org>, Alyssa Huang <ahuang@confluent.io>, Chia-Ping Tsai <chia7712@apache.org>
- Currently if we received extraneous topic partitions in the response
or if the response was missing some partitions requested, we were
processing the response as it came and even populated the callback with
these partitions.
- These invalid responses should be parsed at the
`ShareConsumeRequestManager`.
- If the response missed any acknowledgements for partitions that were
requested, then we fail the request with `InvalidRecordStateException`
and populate the callbacks.
- For any extraneous partitions in the response, we log an error and
ignore them.
Some refactors are also done in this PR in ShareConsumeRequestManager to
make the code more readable.
Reviewers: Andrew Schofield <aschofield@confluent.io>
* In this PR, we add various infra classes needed to support the
`deleteShareGroups` functionality via the `kafka-share-groups.sh`
script, as well as the implementation of `kafka-share-groups.sh --delete`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Since we no longer convert records to the old format for fetch requests, this code is no longer used in production.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Cleanup code to avoid rawtype, and add suppressions where necessary.
Change the build to fail on rawtype warning.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
3.3.0 was the first KRaft release that was deemed production-ready and also
when KIP-778 (KRaft to KRaft upgrades) landed. Given that, it's reasonable
for 4.x to only support upgrades from 3.3.0 or newer (the metadata version also
needs to be set to "3.3" or newer before upgrading).
Noteworthy changes:
1. `AlterPartition` no longer includes topic names, which makes it possible to
simplify `AlterParitionManager` logic.
2. Metadata versions older than `IBP_3_3_IV3` have been removed and
`IBP_3_3_IV3` is now the minimum version.
3. `MINIMUM_BOOTSTRAP_VERSION` has been removed.
4. Removed `isLeaderRecoverySupported`, `isNoOpsRecordSupported`,
`isKRaftSupported`, `isBrokerRegistrationChangeRecordSupported` and
`isInControlledShutdownStateSupported` - these are always `true` now.
Also removed related conditional code.
5. Removed default metadata version or metadata version fallbacks in
multiple places - we now fail-fast instead of potentially using an incorrect
metadata version.
6. Update `MetadataBatchLoader.resetToImage` to set `hasSeenRecord`
based on whether image is empty - this was a previously existing issue that
became more apparent after the changes in this PR.
7. Remove `ibp` parameter from `BootstrapDirectory`
8. A number of tests were not useful anymore and have been removed.
I will update the upgrade notes via a separate PR as there are a few things that
need changing and it would be easier to do so that way.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluen.io>, Ken Huang <s7133700@gmail.com>
This commit adds the Streams group heartbeat request manager
to the async consumer. The Streams group heartbeat request
manager is responsible to send heartbeat requests and to
process their responses.
This commit implements:
- sending of full heartbeat request (independent of any state)
- processing successful response
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Introduced two new exception classes to the Kafka error handling framework:
ApplicationRecoverableException: This exception signals that the error is recoverable, but the producer needs to be restarted. It helps in scenarios where recovery actions (like re-balancing or restoring from checkpoints) are needed.
RefreshRetriableException: This exception occurs when metadata is outdated or invalid and needs to be refreshed before retrying the request. It helps handle retries that depend on updated metadata.
Both classes are abstract and in upcoming PRs they will be extended by relevant classes as mentioned in KIP-1050:Exception Table.
Reviewers: Justine Olshan <jolshan@confluent.io>, Sanskar Jhajharia <jhajharia.sanskar@gmail.com>
Frequently updating the trust store can cause unexpected termination of the AsyncConsumer background thread.
1. To resolve this issue, reuse the same AdminClient instead of recreating it.
2. Add error logging when fail to initialize resources for the consumer network thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KAFKA-16720 aims to add the support for the AlterShareGroupOffsets AdminClient. Key Changes in the PR:
1. Added handing of alterShareGroupOffsets() in KafkaAdminClient and introduce AlterShareGroupOffsetRequest/AlterShareGroupOffsetResponse/AlterShareGroupOffsetsOptions classes.
2. Corresponding test in KafkaAdminClientTest.
3. Added ALTER_SHARE_GROUP_OFFSETS API (will finish it in next PR and the share coordinator pieces)
Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Most of the changes are obvious clean-ups/fixes. A couple of noteworthy items:
1. Support for non LTS versions is clarified (we were incorrectly stating full support
for Java 23).
2. TLS version negotiation details are clarified.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit ensures that the ClientQuotaCallback#updateClusterMetadata method is executed in KRaft mode. This method is triggered whenever a topic or cluster metadata change occurs. However, in KRaft mode, the current implementation of the updateClusterMetadata API is inefficient due to the requirement of creating a full Cluster object. To address this, a follow-up issue (KAFKA-18239) has been created to explore more efficient mechanisms for providing cluster information to the ClientQuotaCallback without incurring the overhead of a full Cluster object creation.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The Streams membership manager is used client-side in the
background thread of the async consumer. For each member
/consumer, it is responsible for:
* keeping the member state,
* keeping assignments for the member,
* reconciling the assignments of the member -- for example
when tasks need to be revoked before other tasks are assigned
* requesting invocations of assignment and revocation callbacks
by the stream thread.
The Streams membership manager is called by the background thread of
the async consumer, directly in its event loop and from the
Streams group heartbeat request manager. The Streams membership
manager uses the Streams rebalance events processor to request
assignment/revocation callback in the stream thread.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bill Bejeck <bill@confluent.io>
Return produce v0-v2 as supported versions in `ApiVersionsResponse`, but disable support
for it everywhere else.
Since clients pick the highest supported version by both client and broker during version
negotiation, this solves the problem with minimal tech debt (even though it's not ideal that
`ApiVersionsResponse` becomes inconsistent with the actual protocol support).
Add one test for the socket server handling (in `ProcessorTest`) and one test for the
client behavior (in `ProduceRequestTest`). Adjust a couple of api versions tests to verify
the new behavior.
Finally, include a few clean-ups in `ApiKeys`, `Protocol`, `ProduceRequest`,
`ProduceRequestTest` and `BrokerApiVersionsCommandTest`.
Reference to related librdkafka issue:
https://github.com/confluentinc/librdkafka/issues/4956
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
The PR corrects the check which was introduced in #5332 where position is checked to be within boundaries of file. The check
position > currentSizeInBytes - start
is incorrect, since the position is relative to start.
Reviewers: Jun Rao <junrao@gmail.com>
This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.
Background
Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.
The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().
On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.
This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.
Options
The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:
The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
The background thread should skip the fetch request generation entirely if there are any buffered partitions
The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
The background thread should skip fetching from the nodes that have buffered partitions
Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jun Rao <junrao@gmail.com>
While testing, it was found that the not_enough_replicas error was super common and could be easily confused. Since we are already bumping the request, we can signify that the produce request may return this error and new clients can handle it
(Note, the java client should be able to handle this already as a retriable error, but other client libraries may need to implement this change)
Reviewers: Justine Olshan <jolshan@confluent.io>
Ensure we always return empty records (including cases where an error is returned).
We also remove `nullable` from `records` since it is effectively expected to be
non-null by a large percentage of clients in the wild.
This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.
Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
During testing, we identified that kafka-python (and aiokafka) relies on metadata request v0 and
hence we need to add these back to comply with the premise of KIP-896 - i.e. it should not
break the clients listed within it.
I reverted the changes from #18218 related to the removal of metadata versions 0-3.
I will submit a separate PR to undeprecate these API versions on the relevant 3.x branches.
kafka-python (and aiokafka) work correctly (produce & consume) with this change on
top of the 4.0 branch.
Reviewers: David Arthur <mumrah@gmail.com>
The following tests were previously reported as flaky but were only annotated with a comment in pull request #18558 due to module dependency limitations:
testAdminClientApisAuthenticationFailure
testOutdatedCoordinatorAssignment
testThrottledProducerConsumer
With the introduction of the new test infrastructure #18602 , which allows all modules to use the @Flaky annotation, these tests should now be updated to include the @Flaky annotation.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch reorganizes our test infrastructure into three Gradle modules:
":test-common:test-common-internal-api" is now a minimal dependency which exposes interfaces and annotations only. It has one project dependency on server-common to expose commonly used data classes (MetadataVersion, Feature, etc). Since this pulls in server-common, this module is Java 17+. It cannot be used by ":clients" or other Java 11 modules.
":test-common:test-common-util" includes the auto-quarantined JUnit extension. The @Flaky annotation has been moved here. Since this module has no project dependencies, we can add it to the Java 11 list so that ":clients" and others can utilize the @Flaky annotation
":test-common:test-common-runtime" now includes all of the test infrastructure code (TestKitNodes, etc). This module carries heavy dependencies (core, etc) and so it should not normally be included as a compile-time dependency.
In addition to this reorganization, this patch leverages JUnit SPI service discovery so that modules can utilize the integration test framework without depending on ":core". This will allow us to start moving integration tests out of core and into the appropriate sub-module. This is done by adding ":test-common:test-common-runtime" as a testRuntimeOnly dependency rather than as a testImplementation dependency. A trivial example was added to QuorumControllerTest to illustrate this.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
Currently, Producer.send doc is inconsistent with actual exception behavior
- TimeoutException: This won't be thrown from send on buffer-full or metadata-missing actually. Instead, it will returned as failed future.
- AuthenticationException/AuthorizationException: These exceptions are also won't be thrown. Returned with failed future actually.
Fixed Callback javadoc and ProducerConfig doc as well.
Reviewers: Luke Chen <showuon@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit adds a processor named
StreamsRebalanceEventsProcessor that handles the rebalance
events sent from the background thread of the async
consumer to the stream thread when an task
assignment changes. It also adds the corresponding rebalance
events.
Additionally, this commit adds StreamsRebalanceData that
maintains the data that is exchanges for the Streams rebalance
protocol.
All of these are used by the Streams heartbeat request manager
and the Streams membership manager that will be added in a future
commit.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This patch does a few things:
1) Replace ApiMessageAndVersion by ApiMessage in CoordinatorRecord for the key
2) Leverage the fact that ApiMessage exposes the apiKey. Hence we don't need to specify the key anymore.
Reviewers: Andrew Schofield <aschofield@confluent.io>
Update producer id request / response formats and transaction log value format. There is no functional change.
Reviewers: Justine Olshan <jolshan@confluent.io>, Calvin Liu <caliu@confluent.io>
We should provide the same informative error message for both timeout
cases.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Kafka 4.0 will remove support for zk mode and will require conversion to kraft
before upgrading to 4.0. The minimum kraft version is 3.0 (aka 3.0-IV1).
This provides an opportunity to remove exclusively server side protocols versions
that only exist to allow direct upgrades from versions older than 3.0 or that are
used only by zk mode.
Since KRaft became production ready in 3.3, we should consider setting the
baseline to 3.3. But that requires more discussion and it can be done via a
separate change (KAFKA-18601).
Protocol changes:
* Remove RequestHeader v0 (only used by ControlledShutdown v0)
* Remove WriteTxnMarkers v0
* Remove all versions of ControlledShutdown, LeaderAndIsr, StopReplica, UpdateMetadata
In order to remove all versions safely, extend generator to support setting
"versions" to "none". In this case, we no longer generate the `*Data` classes,
but we still reserve the id for the relevant protocol api (so it doesn't get
accidentally used for something else). The protocol documentation is correct
after these changes.
We kept a simplified version of `LeaderAndIsr{Request|Response}` because
it's used by many tests that are still relevant in kraft mode. Once
KAFKA-18486 is done, it may be possible to remove it (I left a comment on
the ticket). Similarly, KAFKA-18487 may make it possible to remove
the introduced `StopReplicaPartitionState` (left a comment on that ticket too).
There are a number of places that were adjusted to include an
`ApiKeys.hasValidVersion` check.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR implements the second part of KIP-996 and KAFKA-16164 (tasks KAFKA-16607, KAFKA-17642, KAFKA-17643, KAFKA-17675) which encompass the response handling of PreVotes, addition of new ProspectiveState, update to metrics, and addition of Raft simulation tests.
Voters now transition to ProspectiveState first before CandidateState to prevent unnecessary epoch bumps. Voters in ProspectiveState send PreVotes requests which are Vote requests with PreVote set to true.
Follower grants PreVotes if it has not yet fetched successfully from leader. Leader denies all PreVotes. Unattached, Prospective, Candidate, and Resigned will grant PreVotes if the requesting replica's log is at least as long as theirs. Granted PreVotes are not persisted like standard votes. It is possible for a voter to grant several PreVotes in the same epoch.
The only state which is allowed to transition directly to CandidateState is ProspectiveState. This happens on reception of majority of granted PreVotes or if at least one voter doesn't support PreVote requests.
Prospective will transition to Follower after election loss/timeout if it was already aware of last known leader and the leader's endpoint, or at any point if it discovers the leader.
Prospective will transition to Unattached after election loss/timeout if it does not know the leader endpoints.
After electionTimeout, Resigned now always transitions to Unattached and increases the epoch.
Prospective grants standard votes if it has not already granted a standard vote (no votedKey), has no leaderId, and the recipient's log is current enough
Candidate no longer backs off after election timeout. Candidate still backs off after election loss.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Since the example.com DNS lookup changed the second time within one
year, we rewrote the unit tests for ClientUtils so that they do
not make a real DNS lookup to the outside but use mocks.
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered.
Reviewers: Justine Olshan <jolshan@confluent.io>
SASL mechanisms that do support neither integrity nor confidentality should throw exception on wrap/unwrap.
The current implementation does not implement wrap/unwrap correctly.
This may cause security issues, if the code using the mechanisms does
not check for QOP correctly.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <i@soarez.me>
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer.
Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <david.jacot@gmail.com>
KIP-863 introduced a change to ByteBufferDeserializer which is not
properly documented, but should be called out because it could surface
bugs in application code which using ByteBufferDeserializer.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().
Reviewers: Matthias J. Sax <matthias@confluent.io>, David Jacot <djacot@confluent.io>
Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:
1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.
Add integration tests, these tests existed before #18267 - restored, modified and
extended them.
Reviewers: Jun Rao <jun@confluent.io>